Skip to content

Commit 11f4042

Browse files
committed
update
1 parent 3f8d4a0 commit 11f4042

File tree

36 files changed

+344
-122
lines changed

36 files changed

+344
-122
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct SnapshotReferencedFiles {
4040
pub segments: HashSet<String>,
4141
pub blocks: HashSet<String>,
4242
pub blocks_index: HashSet<String>,
43+
pub blocks_stats: HashSet<String>,
4344
}
4445

4546
impl SnapshotReferencedFiles {
@@ -54,6 +55,9 @@ impl SnapshotReferencedFiles {
5455
for file in &self.blocks_index {
5556
files.push(file.clone());
5657
}
58+
for file in &self.blocks_stats {
59+
files.push(file.clone());
60+
}
5761
files
5862
}
5963
}
@@ -132,6 +136,7 @@ pub async fn get_snapshot_referenced_files(
132136
segments,
133137
blocks: locations_referenced.block_location,
134138
blocks_index: locations_referenced.bloom_location,
139+
blocks_stats: locations_referenced.stats_location,
135140
}))
136141
}
137142

@@ -164,10 +169,11 @@ pub async fn do_gc_orphan_files(
164169
None => return Ok(()),
165170
};
166171
let status = format!(
167-
"gc orphan: read referenced files:{},{},{}, cost:{:?}",
172+
"gc orphan: read referenced files:{},{},{},{}, cost:{:?}",
168173
referenced_files.segments.len(),
169174
referenced_files.blocks.len(),
170175
referenced_files.blocks_index.len(),
176+
referenced_files.blocks_stats.len(),
171177
start.elapsed()
172178
);
173179
ctx.set_status_info(&status);
@@ -268,6 +274,36 @@ pub async fn do_gc_orphan_files(
268274
);
269275
ctx.set_status_info(&status);
270276

277+
// 5. Purge orphan block stats files.
278+
// 5.1 Get orphan block stats files to be purged
279+
let stats_locations_to_be_purged = get_orphan_files_to_be_purged(
280+
fuse_table,
281+
location_gen.block_statistics_location_prefix(),
282+
referenced_files.blocks_stats,
283+
retention_time,
284+
)
285+
.await?;
286+
let status = format!(
287+
"gc orphan: read stats_locations_to_be_purged:{}, cost:{:?}",
288+
stats_locations_to_be_purged.len(),
289+
start.elapsed()
290+
);
291+
ctx.set_status_info(&status);
292+
293+
// 5.2 Delete all the orphan block stats files to be purged
294+
let purged_file_num = stats_locations_to_be_purged.len();
295+
fuse_table
296+
.try_purge_location_files(
297+
ctx.clone(),
298+
HashSet::from_iter(stats_locations_to_be_purged.into_iter()),
299+
)
300+
.await?;
301+
let status = format!(
302+
"gc orphan: purged block stats files:{}, cost:{:?}",
303+
purged_file_num,
304+
start.elapsed()
305+
);
306+
ctx.set_status_info(&status);
271307
Ok(())
272308
}
273309

@@ -286,10 +322,11 @@ pub async fn do_dry_run_orphan_files(
286322
None => return Ok(()),
287323
};
288324
let status = format!(
289-
"dry_run orphan: read referenced files:{},{},{}, cost:{:?}",
325+
"dry_run orphan: read referenced files:{},{},{},{}, cost:{:?}",
290326
referenced_files.segments.len(),
291327
referenced_files.blocks.len(),
292328
referenced_files.blocks_index.len(),
329+
referenced_files.blocks_stats.len(),
293330
start.elapsed()
294331
);
295332
ctx.set_status_info(&status);
@@ -351,6 +388,23 @@ pub async fn do_dry_run_orphan_files(
351388

352389
purge_files.extend(index_locations_to_be_purged);
353390

391+
// 5. Get purge orphan block stats files.
392+
let stats_locations_to_be_purged = get_orphan_files_to_be_purged(
393+
fuse_table,
394+
location_gen.block_statistics_location_prefix(),
395+
referenced_files.blocks_stats,
396+
retention_time,
397+
)
398+
.await?;
399+
let status = format!(
400+
"dry_run orphan: read stats_locations_to_be_purged:{}, cost:{:?}",
401+
stats_locations_to_be_purged.len(),
402+
start.elapsed()
403+
);
404+
ctx.set_status_info(&status);
405+
406+
purge_files.extend(stats_locations_to_be_purged);
407+
354408
Ok(())
355409
}
356410

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ pub async fn do_vacuum2(
336336
}
337337
indexes_to_gc
338338
.push(TableMetaLocationGenerator::gen_bloom_index_location_from_block_location(loc));
339+
340+
indexes_to_gc
341+
.push(TableMetaLocationGenerator::gen_block_stats_location_from_block_location(loc));
339342
}
340343

341344
ctx.set_status_info(&format!(

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use databend_common_exception::ErrorCode;
2424
use databend_common_expression::TableSchemaRef;
2525
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
2626
use databend_common_settings::Settings;
27+
use databend_common_sql::ApproxDistinctColumns;
2728
use databend_common_sql::BloomIndexColumns;
28-
use databend_common_sql::DistinctColumns;
2929
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
3030
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
3131
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
@@ -37,13 +37,13 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
3737
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_PAGE;
3838
use databend_storages_common_index::BloomIndex;
3939
use databend_storages_common_index::RangeIndex;
40+
use databend_storages_common_table_meta::table::OPT_KEY_APPROX_DISTINCT_COLUMNS;
4041
use databend_storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;
4142
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
4243
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
4344
use databend_storages_common_table_meta::table::OPT_KEY_COMMENT;
4445
use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
4546
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
46-
use databend_storages_common_table_meta::table::OPT_KEY_DISTINCT_COLUMNS;
4747
use databend_storages_common_table_meta::table::OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH;
4848
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
4949
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
@@ -71,7 +71,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
7171
r.insert(FUSE_OPT_KEY_ENABLE_AUTO_VACUUM);
7272

7373
r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS);
74-
r.insert(OPT_KEY_DISTINCT_COLUMNS);
74+
r.insert(OPT_KEY_APPROX_DISTINCT_COLUMNS);
7575
r.insert(OPT_KEY_TABLE_COMPRESSION);
7676
r.insert(OPT_KEY_STORAGE_FORMAT);
7777
r.insert(OPT_KEY_DATABASE_ID);
@@ -217,12 +217,12 @@ pub fn is_valid_bloom_index_columns(
217217
Ok(())
218218
}
219219

220-
pub fn is_valid_distinct_columns(
220+
pub fn is_valid_approx_distinct_columns(
221221
options: &BTreeMap<String, String>,
222222
schema: TableSchemaRef,
223223
) -> databend_common_exception::Result<()> {
224-
if let Some(value) = options.get(OPT_KEY_DISTINCT_COLUMNS) {
225-
DistinctColumns::verify_definition(value, schema, RangeIndex::supported_table_type)?;
224+
if let Some(value) = options.get(OPT_KEY_APPROX_DISTINCT_COLUMNS) {
225+
ApproxDistinctColumns::verify_definition(value, schema, RangeIndex::supported_table_type)?;
226226
}
227227
Ok(())
228228
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
6666
use log::error;
6767
use log::info;
6868

69+
use crate::interpreters::common::table_option_validation::is_valid_approx_distinct_columns;
6970
use crate::interpreters::common::table_option_validation::is_valid_block_per_segment;
7071
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
7172
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
7273
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
7374
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
74-
use crate::interpreters::common::table_option_validation::is_valid_distinct_columns;
7575
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
7676
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
7777
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
@@ -469,7 +469,7 @@ impl CreateTableInterpreter {
469469
is_valid_row_per_block(&table_meta.options)?;
470470
// check bloom_index_columns.
471471
is_valid_bloom_index_columns(&table_meta.options, schema.clone())?;
472-
is_valid_distinct_columns(&table_meta.options, schema)?;
472+
is_valid_approx_distinct_columns(&table_meta.options, schema)?;
473473
is_valid_change_tracking(&table_meta.options)?;
474474
// check random seed
475475
is_valid_random_seed(&table_meta.options)?;

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
4949
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
5050
use log::error;
5151

52+
use crate::interpreters::common::table_option_validation::is_valid_approx_distinct_columns;
5253
use crate::interpreters::common::table_option_validation::is_valid_block_per_segment;
5354
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
5455
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
5556
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
56-
use crate::interpreters::common::table_option_validation::is_valid_distinct_columns;
5757
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
5858
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
5959
use crate::interpreters::Interpreter;
@@ -164,7 +164,7 @@ impl Interpreter for SetOptionsInterpreter {
164164

165165
// check bloom_index_columns.
166166
is_valid_bloom_index_columns(&self.plan.set_options, table.schema())?;
167-
is_valid_distinct_columns(&self.plan.set_options, table.schema())?;
167+
is_valid_approx_distinct_columns(&self.plan.set_options, table.schema())?;
168168

169169
if let Some(new_snapshot_location) =
170170
set_segment_format(self.ctx.clone(), table.clone(), &self.plan.set_options).await?

src/query/service/src/test_kits/block_writer.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@ use databend_common_expression::FunctionContext;
1919
use databend_common_expression::TableSchemaRef;
2020
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
2121
use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE;
22+
use databend_common_sql::ApproxDistinctColumns;
2223
use databend_common_sql::BloomIndexColumns;
2324
use databend_common_storages_fuse::io::serialize_block;
25+
use databend_common_storages_fuse::io::BlockStatisticsState;
2426
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
2527
use databend_common_storages_fuse::io::WriteSettings;
2628
use databend_common_storages_fuse::FuseStorageFormat;
2729
use databend_storages_common_blocks::blocks_to_parquet;
2830
use databend_storages_common_index::BloomIndex;
2931
use databend_storages_common_index::BloomIndexBuilder;
32+
use databend_storages_common_index::RangeIndex;
3033
use databend_storages_common_table_meta::meta::BlockMeta;
3134
use databend_storages_common_table_meta::meta::ClusterStatistics;
3235
use databend_storages_common_table_meta::meta::Compression;
@@ -85,6 +88,9 @@ impl<'a> BlockWriter<'a> {
8588
let (bloom_filter_index_size, bloom_filter_index_location, meta) = self
8689
.build_block_index(data_accessor, schema.clone(), &block, block_id)
8790
.await?;
91+
let (block_stats_size, block_stats_location) = self
92+
.build_block_stats(data_accessor, schema.clone(), &block, block_id)
93+
.await?;
8894

8995
let write_settings = WriteSettings {
9096
storage_format,
@@ -112,7 +118,8 @@ impl<'a> BlockWriter<'a> {
112118
None,
113119
None,
114120
None,
115-
None,
121+
block_stats_location,
122+
block_stats_size,
116123
Compression::Lz4Raw,
117124
Some(Utc::now()),
118125
);
@@ -155,4 +162,29 @@ impl<'a> BlockWriter<'a> {
155162
Ok((0u64, None, None))
156163
}
157164
}
165+
166+
pub async fn build_block_stats(
167+
&self,
168+
data_accessor: &Operator,
169+
schema: TableSchemaRef,
170+
block: &DataBlock,
171+
block_id: Uuid,
172+
) -> Result<(u64, Option<Location>)> {
173+
let location = self.location_generator.block_stats_location(&block_id);
174+
175+
let hll_columns = ApproxDistinctColumns::All;
176+
let ndv_columns_map =
177+
hll_columns.distinct_column_fields(schema.clone(), RangeIndex::supported_table_type)?;
178+
let maybe_block_stats =
179+
BlockStatisticsState::from_data_block(location, block, &ndv_columns_map)?;
180+
if let Some(block_stats) = maybe_block_stats {
181+
let size = block_stats.block_stats_size();
182+
data_accessor
183+
.write(&block_stats.location.0, block_stats.data)
184+
.await?;
185+
Ok((size, Some(block_stats.location)))
186+
} else {
187+
Ok((0u64, None))
188+
}
189+
}
158190
}

src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ fn build_test_segment_info(
340340
vector_index_size: None,
341341
vector_index_location: None,
342342
virtual_block_meta: None,
343-
block_stats_meta: None,
343+
block_stats_location: None,
344+
block_stats_size: 0,
344345
compression: Compression::Lz4,
345346
create_on: Some(Utc::now()),
346347
};

src/query/service/tests/it/storages/fuse/meta/column_oriented.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,39 @@ fn check_block_level_meta(
288288
assert!(is_null);
289289
}
290290

291+
// check block stats location
292+
let block_stats_location = column_oriented_segment
293+
.col_by_name(&[BLOCK_STATS_LOCATION])
294+
.unwrap();
295+
for (block_stats_location, block_meta) in block_stats_location.iter().zip(block_metas.iter()) {
296+
let block_stats_location = block_stats_location.as_tuple();
297+
if let Some(block_stats_location) = block_stats_location {
298+
assert_eq!(
299+
block_stats_location[0].as_string().unwrap(),
300+
&block_meta.block_stats_location.as_ref().unwrap().0
301+
);
302+
assert_eq!(
303+
block_stats_location[1]
304+
.as_number()
305+
.unwrap()
306+
.as_u_int64()
307+
.unwrap(),
308+
&block_meta.block_stats_location.as_ref().unwrap().1
309+
);
310+
} else {
311+
assert!(block_meta.block_stats_location.is_none());
312+
}
313+
}
314+
315+
// check block stats size
316+
let block_stats_size = column_oriented_segment
317+
.col_by_name(&[BLOCK_STATS_SIZE])
318+
.unwrap();
319+
for (block_stats_size, block_meta) in block_stats_size.iter().zip(block_metas.iter()) {
320+
let block_stats_size = block_stats_size.as_number().unwrap().as_u_int64().unwrap();
321+
assert_eq!(block_stats_size, &block_meta.block_stats_size);
322+
}
323+
291324
// check compression
292325
let compression = column_oriented_segment.col_by_name(&[COMPRESSION]).unwrap();
293326
for (compression, block_meta) in compression.iter().zip(block_metas.iter()) {

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
8282
None,
8383
None,
8484
None,
85+
0,
8586
meta::Compression::Lz4Raw,
8687
Some(Utc::now()),
8788
));

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,7 @@ impl CompactSegmentTestFixture {
782782
None,
783783
None,
784784
None,
785+
0,
785786
Compression::Lz4Raw,
786787
Some(Utc::now()),
787788
);

0 commit comments

Comments
 (0)