Skip to content

feat: enable block stream write #18285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

164 changes: 82 additions & 82 deletions scripts/selfhost/restore_logs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,52 @@

# Simple logging
log() {
echo "[$(date '+%H:%M:%S')] $1"
echo "[$(date '+%H:%M:%S')] $1"
}

log_error() {
echo "[$(date '+%H:%M:%S')] ERROR: $1" >&2
echo "[$(date '+%H:%M:%S')] ERROR: $1" >&2
}

log_step() {
echo "[$(date '+%H:%M:%S')] [$1/$2] $3"
echo "[$(date '+%H:%M:%S')] [$1/$2] $3"
}

# Parse arguments
while [[ $# -gt 0 ]]; do
case "$1" in
--dsn)
DSN="$2"
shift 2
;;
--stage)
STAGE="$2"
shift 2
;;
*)
if [[ "$1" =~ ^[0-9]{8}$ ]]; then
DATE_ARG="$1"
shift
else
log_error "Unknown parameter: $1"
exit 1
fi
;;
esac
case "$1" in
--dsn)
DSN="$2"
shift 2
;;
--stage)
STAGE="$2"
shift 2
;;
*)
if [[ "$1" =~ ^[0-9]{8}$ ]]; then
DATE_ARG="$1"
shift
else
log_error "Unknown parameter: $1"
exit 1
fi
;;
esac
done

# Validate parameters
if [[ -z "$STAGE" || -z "$DATE_ARG" ]]; then
log_error "Missing required parameters: --stage or yyyymmdd date"
exit 1
log_error "Missing required parameters: --stage or yyyymmdd date"
exit 1
fi

if [[ -z "$DSN" ]]; then
DSN="$BENDSQL_DSN"
if [[ -z "$DSN" ]]; then
log_error "DSN not provided and BENDSQL_DSN not set"
exit 1
fi
DSN="$BENDSQL_DSN"
if [[ -z "$DSN" ]]; then
log_error "DSN not provided and BENDSQL_DSN not set"
exit 1
fi
fi

# Format date
Expand All @@ -66,8 +66,8 @@ DOWNLOAD_SQL="PRESIGN DOWNLOAD @${STAGE}/${TAR_FILE}"
DOWNLOAD_URL=$(bendsql --dsn "${DSN}" --query="${DOWNLOAD_SQL}" | awk '{print $3}')

if [[ -z "$DOWNLOAD_URL" ]]; then
log_error "Failed to generate download URL for ${TAR_FILE}"
exit 1
log_error "Failed to generate download URL for ${TAR_FILE}"
exit 1
fi
log "Download URL generated successfully"

Expand All @@ -76,8 +76,8 @@ log_step "2" "6" "Downloading ${TAR_FILE} from stage @${STAGE}"
curl -s -o "${TAR_FILE}" "${DOWNLOAD_URL}"

if [[ ! -f "${TAR_FILE}" ]]; then
log_error "Failed to download ${TAR_FILE}"
exit 1
log_error "Failed to download ${TAR_FILE}"
exit 1
fi

FILE_SIZE=$(du -h "${TAR_FILE}" | cut -f1)
Expand All @@ -98,21 +98,21 @@ TARGET_DIRS=("columns" "user_functions" "query_raw_logs" "query_logs" "query_pro
PREFIX=""

for target_dir in "${TARGET_DIRS[@]}"; do
SAMPLE_FILE=$(find "${TEMP_DIR}" -path "*/${target_dir}/*" -type f | head -1)
if [[ -n "$SAMPLE_FILE" ]]; then
RELATIVE_PATH="${SAMPLE_FILE#${TEMP_DIR}/}"
PREFIX=$(echo "$RELATIVE_PATH" | sed "s|/${target_dir}/.*||" | sed "s|${target_dir}/.*||")
if [[ -n "$PREFIX" ]]; then
PREFIX="${PREFIX}/"
fi
break
fi
SAMPLE_FILE=$(find "${TEMP_DIR}" -path "*/${target_dir}/*" -type f | head -1)
if [[ -n "$SAMPLE_FILE" ]]; then
RELATIVE_PATH="${SAMPLE_FILE#${TEMP_DIR}/}"
PREFIX=$(echo "$RELATIVE_PATH" | sed "s|/${target_dir}/.*||" | sed "s|${target_dir}/.*||")
if [[ -n "$PREFIX" ]]; then
PREFIX="${PREFIX}/"
fi
break
fi
done

if [[ -n "$PREFIX" ]]; then
log "Path prefix detected: '${PREFIX}' - will be stripped during upload"
log "Path prefix detected: '${PREFIX}' - will be stripped during upload"
else
log "No path prefix detected - using original file paths"
log "No path prefix detected - using original file paths"
fi

# Step 5: Upload files
Expand All @@ -129,32 +129,32 @@ UPLOAD_SUCCESS=0
UPLOAD_FAILED=0

find "${TEMP_DIR}" -type f | while read -r FILE; do
CURRENT_FILE=$((CURRENT_FILE + 1))
RELATIVE_PATH="${FILE#${TEMP_DIR}/}"
if [[ -n "$PREFIX" && "$RELATIVE_PATH" == ${PREFIX}* ]]; then
UPLOAD_PATH="${RELATIVE_PATH#${PREFIX}}"
else
UPLOAD_PATH="$RELATIVE_PATH"
fi

printf "\rUploading: %d/%d files (Success: %d, Failed: %d)" "$CURRENT_FILE" "$TOTAL_FILES" "$UPLOAD_SUCCESS" "$UPLOAD_FAILED"

UPLOAD_SQL="PRESIGN UPLOAD @${UPLOAD_STAGE}/${UPLOAD_PATH}"
UPLOAD_URL=$(bendsql --dsn "${DSN}" --query="${UPLOAD_SQL}" | awk '{print $3}')

if [[ -n "$UPLOAD_URL" ]]; then
if curl -s -X PUT -T "${FILE}" "${UPLOAD_URL}"; then
UPLOAD_SUCCESS=$((UPLOAD_SUCCESS + 1))
else
UPLOAD_FAILED=$((UPLOAD_FAILED + 1))
fi
else
UPLOAD_FAILED=$((UPLOAD_FAILED + 1))
fi
CURRENT_FILE=$((CURRENT_FILE + 1))
RELATIVE_PATH="${FILE#${TEMP_DIR}/}"

if [[ -n "$PREFIX" && "$RELATIVE_PATH" == ${PREFIX}* ]]; then
UPLOAD_PATH="${RELATIVE_PATH#${PREFIX}}"
else
UPLOAD_PATH="$RELATIVE_PATH"
fi

printf "\rUploading: %d/%d files (Success: %d, Failed: %d)" "$CURRENT_FILE" "$TOTAL_FILES" "$UPLOAD_SUCCESS" "$UPLOAD_FAILED"

UPLOAD_SQL="PRESIGN UPLOAD @${UPLOAD_STAGE}/${UPLOAD_PATH}"
UPLOAD_URL=$(bendsql --dsn "${DSN}" --query="${UPLOAD_SQL}" | awk '{print $3}')

if [[ -n "$UPLOAD_URL" ]]; then
if curl -s -X PUT -T "${FILE}" "${UPLOAD_URL}"; then
UPLOAD_SUCCESS=$((UPLOAD_SUCCESS + 1))
else
UPLOAD_FAILED=$((UPLOAD_FAILED + 1))
fi
else
UPLOAD_FAILED=$((UPLOAD_FAILED + 1))
fi
done

echo # New line after progress
echo # New line after progress
log "Upload completed: ${UPLOAD_SUCCESS} successful, ${UPLOAD_FAILED} failed"

# Cleanup
Expand All @@ -171,23 +171,23 @@ log "Created database: ${RESTORE_DATABASE}"

# Restore tables
declare -A TABLE_MAP=(
["columns"]="system.columns:columns"
["user_functions"]="system.user_functions:user_functions"
["log_history"]="system_history.log_history:query_raw_logs"
["query_history"]="system_history.query_history:query_logs"
["profile_history"]="system_history.profile_history:query_profile_logs"
["columns"]="system.columns:columns"
["user_functions"]="system.user_functions:user_functions"
["log_history"]="system_history.log_history:query_raw_logs"
["query_history"]="system_history.query_history:query_logs"
["profile_history"]="system_history.profile_history:query_profile_logs"
)

for table_name in "${!TABLE_MAP[@]}"; do
IFS=':' read -r source_table source_path <<< "${TABLE_MAP[$table_name]}"
log "Restoring table: ${RESTORE_DATABASE}.${table_name} from @${UPLOAD_STAGE}/${source_path}"
bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="CREATE TABLE ${table_name} LIKE ${source_table};" >/dev/null 2>&1
bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="COPY INTO ${table_name} FROM @${UPLOAD_STAGE}/${source_path};" >/dev/null 2>&1
ROW_COUNT=$(bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="SELECT COUNT(*) FROM ${table_name};" | tail -1)
log "Table ${table_name} restored: ${ROW_COUNT} rows"
IFS=':' read -r source_table source_path <<<"${TABLE_MAP[$table_name]}"

log "Restoring table: ${RESTORE_DATABASE}.${table_name} from @${UPLOAD_STAGE}/${source_path}"

bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="CREATE TABLE ${table_name} LIKE ${source_table};" >/dev/null 2>&1
bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="COPY INTO ${table_name} FROM @${UPLOAD_STAGE}/${source_path};" >/dev/null 2>&1

ROW_COUNT=$(bendsql --dsn "${DSN}" --database "${RESTORE_DATABASE}" --query="SELECT COUNT(*) FROM ${table_name};" | tail -1)
log "Table ${table_name} restored: ${ROW_COUNT} rows"
done

log "Log restoration completed successfully"
Expand Down
21 changes: 21 additions & 0 deletions src/common/metrics/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ static BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS: LazyLock<Histogram> = LazyLock::
register_histogram_in_milliseconds("fuse_block_virtual_column_write_milliseconds")
});

// Block statistics metrics.
static BLOCK_STATS_WRITE_NUMS: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_stats_write_nums"));
static BLOCK_STATS_WRITE_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_stats_write_bytes"));
static BLOCK_STATS_WRITE_MILLISECONDS: LazyLock<Histogram> =
LazyLock::new(|| register_histogram_in_milliseconds("fuse_block_stats_write_milliseconds"));

/// Common metrics.
pub fn metrics_inc_omit_filter_rowgroups(c: u64) {
OMIT_FILTER_ROWGROUPS.inc_by(c);
Expand Down Expand Up @@ -907,3 +915,16 @@ pub fn metrics_inc_block_virtual_column_write_bytes(c: u64) {
pub fn metrics_inc_block_virtual_column_write_milliseconds(c: u64) {
BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS.observe(c as f64);
}

/// Block stats metrics.
pub fn metrics_inc_block_stats_write_nums(c: u64) {
BLOCK_STATS_WRITE_NUMS.inc_by(c);
}

pub fn metrics_inc_block_stats_write_bytes(c: u64) {
BLOCK_STATS_WRITE_BYTES.inc_by(c);
}

pub fn metrics_inc_block_stats_write_milliseconds(c: u64) {
BLOCK_STATS_WRITE_MILLISECONDS.observe(c as f64);
}
58 changes: 56 additions & 2 deletions src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct SnapshotReferencedFiles {
pub segments: HashSet<String>,
pub blocks: HashSet<String>,
pub blocks_index: HashSet<String>,
pub blocks_stats: HashSet<String>,
}

impl SnapshotReferencedFiles {
Expand All @@ -54,6 +55,9 @@ impl SnapshotReferencedFiles {
for file in &self.blocks_index {
files.push(file.clone());
}
for file in &self.blocks_stats {
files.push(file.clone());
}
files
}
}
Expand Down Expand Up @@ -132,6 +136,7 @@ pub async fn get_snapshot_referenced_files(
segments,
blocks: locations_referenced.block_location,
blocks_index: locations_referenced.bloom_location,
blocks_stats: locations_referenced.stats_location,
}))
}

Expand Down Expand Up @@ -164,10 +169,11 @@ pub async fn do_gc_orphan_files(
None => return Ok(()),
};
let status = format!(
"gc orphan: read referenced files:{},{},{}, cost:{:?}",
"gc orphan: read referenced files:{},{},{},{}, cost:{:?}",
referenced_files.segments.len(),
referenced_files.blocks.len(),
referenced_files.blocks_index.len(),
referenced_files.blocks_stats.len(),
start.elapsed()
);
ctx.set_status_info(&status);
Expand Down Expand Up @@ -268,6 +274,36 @@ pub async fn do_gc_orphan_files(
);
ctx.set_status_info(&status);

// 5. Purge orphan block stats files.
// 5.1 Get orphan block stats files to be purged
let stats_locations_to_be_purged = get_orphan_files_to_be_purged(
fuse_table,
location_gen.block_statistics_location_prefix(),
referenced_files.blocks_stats,
retention_time,
)
.await?;
let status = format!(
"gc orphan: read stats_locations_to_be_purged:{}, cost:{:?}",
stats_locations_to_be_purged.len(),
start.elapsed()
);
ctx.set_status_info(&status);

// 5.2 Delete all the orphan block stats files to be purged
let purged_file_num = stats_locations_to_be_purged.len();
fuse_table
.try_purge_location_files(
ctx.clone(),
HashSet::from_iter(stats_locations_to_be_purged.into_iter()),
)
.await?;
let status = format!(
"gc orphan: purged block stats files:{}, cost:{:?}",
purged_file_num,
start.elapsed()
);
ctx.set_status_info(&status);
Ok(())
}

Expand All @@ -286,10 +322,11 @@ pub async fn do_dry_run_orphan_files(
None => return Ok(()),
};
let status = format!(
"dry_run orphan: read referenced files:{},{},{}, cost:{:?}",
"dry_run orphan: read referenced files:{},{},{},{}, cost:{:?}",
referenced_files.segments.len(),
referenced_files.blocks.len(),
referenced_files.blocks_index.len(),
referenced_files.blocks_stats.len(),
start.elapsed()
);
ctx.set_status_info(&status);
Expand Down Expand Up @@ -351,6 +388,23 @@ pub async fn do_dry_run_orphan_files(

purge_files.extend(index_locations_to_be_purged);

// 5. Get purge orphan block stats files.
let stats_locations_to_be_purged = get_orphan_files_to_be_purged(
fuse_table,
location_gen.block_statistics_location_prefix(),
referenced_files.blocks_stats,
retention_time,
)
.await?;
let status = format!(
"dry_run orphan: read stats_locations_to_be_purged:{}, cost:{:?}",
stats_locations_to_be_purged.len(),
start.elapsed()
);
ctx.set_status_info(&status);

purge_files.extend(stats_locations_to_be_purged);

Ok(())
}

Expand Down
Loading
Loading