Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gcloud/analysis_statistics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ def pipeline_archive_statistics_task(instance_id):


@task
@periodic_task(run_every=crontab(hour="0"))
@periodic_task(run_every=crontab(minute="0", hour="1"))
def backfill_template_variable_statistics_task():
"""
补充流程模板变量统计数据(每天凌晨1点业务低峰期执行)
"""
custom_variables_records = {}

# process common template
Expand Down
154 changes: 129 additions & 25 deletions gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
specific language governing permissions and limitations under the License.
"""
import logging
import time

from celery.schedules import crontab
from celery.task import periodic_task
Expand All @@ -33,6 +34,20 @@
logger = logging.getLogger("root")


def delete_records(queryset, field_name):
"""
带重试机制的删除操作,处理Lock wait timeout
"""
try:
deleted_count = queryset.delete()[0]
logger.info(f"[clean_expired_v2_task_data] Deleted {deleted_count} records from {field_name}")
return deleted_count
except Exception as e:
error_msg = str(e)
logger.exception(f"[clean_expired_v2_task_data] Failed to delete {field_name}: {error_msg}")
return 0


def filter_clean_task_instances():
"""
过滤需要清理的任务实例
Expand Down Expand Up @@ -130,46 +145,135 @@ def filter_clean_task_instances():
@time_record(logger)
def clean_expired_v2_task_data():
"""
清除过期的任务数据
清除过期的任务数据 - 优化版本

优化点:
1. 按依赖关系顺序删除,避免外键冲突
2. 分小批次处理,减少单次锁定的数据量
3. 添加重试机制,处理临时锁冲突
"""
if not settings.ENABLE_CLEAN_EXPIRED_V2_TASK:
logger.info("Skip clean expired task data")
return

logger.info("Start clean expired task data...")
logger.info("Start clean expired task data (optimized)...")

try:
ids = filter_clean_task_instances()
if not ids:
logger.info("[clean_expired_v2_task_data] No tasks to clean")
return

task_ids = [item["id"] for item in ids]
logger.info(f"[clean_expired_v2_task_data] Total {len(task_ids)} tasks to clean, task_ids: {task_ids}")
pipeline_instance_ids = [item["pipeline_instance__instance_id"] for item in ids]
data_to_clean = get_clean_pipeline_instance_data(pipeline_instance_ids)
tasks = TaskFlowInstance.objects.filter(id__in=task_ids)
data_to_clean.update({"tasks": tasks})

pre_delete_pipeline_instance_data.send(sender=TaskFlowInstance, data=data_to_clean)
logger.info(
f"[clean_expired_v2_task_data] Total {len(task_ids)} tasks to clean, "
f"task_ids: {task_ids[:10]}{'...' if len(task_ids) > 10 else ''}"
)

# 🔧 优化1: 分小批次处理任务,避免一次性处理太多
task_batch_size = 20

instance_fields = ["tasks", "pipeline_instances"]
with transaction.atomic():
for field, qs in data_to_clean.items():
if field.endswith("_list") and isinstance(qs, list):
logger.info(
f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data, "
f"e.x.: {qs[0].values_list('pk', flat=True)[:10] if len(qs) > 0 else None}..."
)
[q.delete() for q in qs]
elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
logger.info(
f"[clean_expired_v2_task_data] clean field: {field}, "
f"qs ids: {qs.values_list('pk', flat=True)[:10]}..."
)
qs.delete()
elif field == "pipeline_instances":
qs.update(is_expired=True)
logger.info(f"[clean_expired_v2_task_data] success clean tasks: {task_ids}")
for i in range(0, len(pipeline_instance_ids), task_batch_size):
batch_pipeline_ids = pipeline_instance_ids[i : i + task_batch_size]
batch_task_ids = task_ids[i : i + task_batch_size]

logger.info(
f"[clean_expired_v2_task_data] Processing batch {i//task_batch_size + 1}/"
f"{(len(task_ids) + task_batch_size - 1)//task_batch_size}, "
f"task_ids: {batch_task_ids}"
)

try:
with transaction.atomic():
_clean_task_batch(batch_pipeline_ids, batch_task_ids)
except Exception as e:
logger.exception(f"[clean_expired_v2_task_data] Error cleaning batch {batch_task_ids}: {e}")
# 继续处理下一批,不要因为一批失败而停止所有清理
continue

# 批次间短暂休息,释放数据库压力
time.sleep(0.5)

logger.info("[clean_expired_v2_task_data] All batches processed")
except Exception as e:
logger.exception(f"[clean_expired_v2_task_data] error: {e}")


def _clean_task_batch(pipeline_instance_ids, task_ids):
"""
清理一批任务数据

核心优化:
1. 按正确顺序删除表,避免外键冲突和死锁
2. 添加重试机制处理临时锁冲突
"""
data_to_clean = get_clean_pipeline_instance_data(pipeline_instance_ids)
tasks = TaskFlowInstance.objects.filter(id__in=task_ids)
data_to_clean.update({"tasks": tasks})

# 发送预删除信号
pre_delete_pipeline_instance_data.send(sender=TaskFlowInstance, data=data_to_clean)

instance_fields = ["tasks", "pipeline_instances"]

# 🔧 优化2: 定义删除顺序,按依赖关系从叶子到根
# 先删除依赖表(子表),再删除主表,避免外键冲突和死锁
delete_order = [
# 1. 节点相关的详细数据(最底层)
"callback_data",
"schedules_list",
"execution_history_list",
"execution_data_list",
"state_list",
"data_list",
# 2. 节点配置和策略
"retry_node_list",
"timeout_node_list",
"node_list",
# 3. 上下文和进程数据
"context_value",
"context_outputs",
"process",
# 4. 周期任务和业务层数据
"periodic_task_history",
"nodes_in_pipeline",
# 5. 快照和树信息
"execution_snapshot",
"tree_info",
# 6. 最后处理实例和任务
"tasks",
"pipeline_instances",
]

for field_name in delete_order:
if field_name not in data_to_clean:
continue

qs_or_list = data_to_clean[field_name]

# 处理列表类型(分批的QuerySet)
if field_name.endswith("_list") and isinstance(qs_or_list, list):
logger.info(f"[clean_expired_v2_task_data] clean field: {field_name}, " f"{len(qs_or_list)} batch data")

# 🔧 修复: 使用循环而不是列表推导式,避免内存问题
for idx, qs in enumerate(qs_or_list):
delete_records(qs, f"{field_name}[{idx}]")

# 处理pipeline_instances - 只标记过期,不删除
elif field_name == "pipeline_instances":
updated_count = qs_or_list.update(is_expired=True)
logger.info(f"[clean_expired_v2_task_data] Updated {updated_count} pipeline_instances")

# 处理需要删除的表
elif field_name not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
logger.info(f"[clean_expired_v2_task_data] clean field: {field_name}")
delete_records(qs_or_list, field_name)

logger.info(f"[clean_expired_v2_task_data] Successfully cleaned batch: {task_ids}")


@periodic_task(run_every=(crontab(*settings.ARCHIVE_EXPIRED_V2_TASK_CRON)), ignore_result=True, queue="task_data_clean")
@time_record(logger)
def archive_expired_v2_task_data():
Expand Down
Loading