From 143e758d4d4d89b8dc0fdf92301fdf5d5de74952 Mon Sep 17 00:00:00 2001 From: dengyh Date: Wed, 29 Oct 2025 17:06:46 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8D=E6=A8=A1?= =?UTF-8?q?=E6=9D=BF=E5=8F=98=E9=87=8F=E7=BB=9F=E8=AE=A1=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E6=89=A7=E8=A1=8C=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=20--story=3D149770577?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcloud/analysis_statistics/tasks.py | 5 +- gcloud/contrib/cleaner/tasks.py | 165 +++++++++++++++++++++++----- 2 files changed, 144 insertions(+), 26 deletions(-) diff --git a/gcloud/analysis_statistics/tasks.py b/gcloud/analysis_statistics/tasks.py index fd9ff709ef..085ce2736b 100644 --- a/gcloud/analysis_statistics/tasks.py +++ b/gcloud/analysis_statistics/tasks.py @@ -349,8 +349,11 @@ def pipeline_archive_statistics_task(instance_id): @task -@periodic_task(run_every=crontab(hour="0")) +@periodic_task(run_every=crontab(minute="0", hour="1")) def backfill_template_variable_statistics_task(): + """ + 补充流程模板变量统计数据(每天凌晨1点业务低峰期执行) + """ custom_variables_records = {} # process common template diff --git a/gcloud/contrib/cleaner/tasks.py b/gcloud/contrib/cleaner/tasks.py index 198b69ab47..e0e98365cf 100644 --- a/gcloud/contrib/cleaner/tasks.py +++ b/gcloud/contrib/cleaner/tasks.py @@ -11,6 +11,7 @@ specific language governing permissions and limitations under the License. """ import logging +import time from celery.schedules import crontab from celery.task import periodic_task @@ -33,6 +34,30 @@ logger = logging.getLogger("root") +def delete_with_retry(queryset, field_name, max_retries=3, retry_delay=2): + """ + 带重试机制的删除操作,处理Lock wait timeout + """ + for attempt in range(max_retries): + try: + deleted_count = queryset.delete()[0] + logger.info(f"[clean_expired_v2_task_data] Deleted {deleted_count} records from {field_name}") + return deleted_count + except Exception as e: + error_msg = str(e) + if "Lock wait timeout exceeded" in error_msg or "Deadlock found" in error_msg: + if attempt < max_retries - 1: + logger.warning( + f"[clean_expired_v2_task_data] Lock timeout on {field_name}, " + f"retry {attempt + 1}/{max_retries} after {retry_delay}s" + ) + time.sleep(retry_delay) + continue + logger.error(f"[clean_expired_v2_task_data] Failed to delete {field_name}: {e}") + raise + return 0 + + def filter_clean_task_instances(): """ 过滤需要清理的任务实例 @@ -130,46 +155,136 @@ def filter_clean_task_instances(): @time_record(logger) def clean_expired_v2_task_data(): """ - 清除过期的任务数据 + 清除过期的任务数据 - 优化版本 + + 优化点: + 1. 使用分布式锁,确保同一时间只有一个worker执行 + 2. 按依赖关系顺序删除,避免外键冲突 + 3. 分小批次处理,减少单次锁定的数据量 + 4. 添加重试机制,处理临时锁冲突 """ if not settings.ENABLE_CLEAN_EXPIRED_V2_TASK: logger.info("Skip clean expired task data") return - logger.info("Start clean expired task data...") + logger.info("Start clean expired task data (optimized)...") + try: ids = filter_clean_task_instances() + if not ids: + logger.info("[clean_expired_v2_task_data] No tasks to clean") + return + task_ids = [item["id"] for item in ids] - logger.info(f"[clean_expired_v2_task_data] Total {len(task_ids)} tasks to clean, task_ids: {task_ids}") pipeline_instance_ids = [item["pipeline_instance__instance_id"] for item in ids] - data_to_clean = get_clean_pipeline_instance_data(pipeline_instance_ids) - tasks = TaskFlowInstance.objects.filter(id__in=task_ids) - data_to_clean.update({"tasks": tasks}) - pre_delete_pipeline_instance_data.send(sender=TaskFlowInstance, data=data_to_clean) + logger.info( + f"[clean_expired_v2_task_data] Total {len(task_ids)} tasks to clean, " + f"task_ids: {task_ids[:10]}{'...' if len(task_ids) > 10 else ''}" + ) - instance_fields = ["tasks", "pipeline_instances"] - with transaction.atomic(): - for field, qs in data_to_clean.items(): - if field.endswith("_list") and isinstance(qs, list): - logger.info( - f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data, " - f"e.x.: {qs[0].values_list('pk', flat=True)[:10] if len(qs) > 0 else None}..." - ) - [q.delete() for q in qs] - elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE: - logger.info( - f"[clean_expired_v2_task_data] clean field: {field}, " - f"qs ids: {qs.values_list('pk', flat=True)[:10]}..." - ) - qs.delete() - elif field == "pipeline_instances": - qs.update(is_expired=True) - logger.info(f"[clean_expired_v2_task_data] success clean tasks: {task_ids}") + # 🔧 优化1: 分小批次处理任务,避免一次性处理太多 + task_batch_size = 20 + + for i in range(0, len(pipeline_instance_ids), task_batch_size): + batch_pipeline_ids = pipeline_instance_ids[i : i + task_batch_size] + batch_task_ids = task_ids[i : i + task_batch_size] + + logger.info( + f"[clean_expired_v2_task_data] Processing batch {i//task_batch_size + 1}/" + f"{(len(task_ids) + task_batch_size - 1)//task_batch_size}, " + f"task_ids: {batch_task_ids}" + ) + + try: + with transaction.atomic(): + _clean_task_batch(batch_pipeline_ids, batch_task_ids) + except Exception as e: + logger.exception(f"[clean_expired_v2_task_data] Error cleaning batch {batch_task_ids}: {e}") + # 继续处理下一批,不要因为一批失败而停止所有清理 + continue + + # 批次间短暂休息,释放数据库压力 + time.sleep(0.5) + + logger.info("[clean_expired_v2_task_data] All batches processed") except Exception as e: logger.exception(f"[clean_expired_v2_task_data] error: {e}") +def _clean_task_batch(pipeline_instance_ids, task_ids): + """ + 清理一批任务数据 + + 核心优化: + 1. 按正确顺序删除表,避免外键冲突和死锁 + 2. 添加重试机制处理临时锁冲突 + """ + data_to_clean = get_clean_pipeline_instance_data(pipeline_instance_ids) + tasks = TaskFlowInstance.objects.filter(id__in=task_ids) + data_to_clean.update({"tasks": tasks}) + + # 发送预删除信号 + pre_delete_pipeline_instance_data.send(sender=TaskFlowInstance, data=data_to_clean) + + instance_fields = ["tasks", "pipeline_instances"] + + # 🔧 优化2: 定义删除顺序,按依赖关系从叶子到根 + # 先删除依赖表(子表),再删除主表,避免外键冲突和死锁 + delete_order = [ + # 1. 节点相关的详细数据(最底层) + "callback_data", + "schedules_list", + "execution_history_list", + "execution_data_list", + "state_list", + "data_list", + # 2. 节点配置和策略 + "retry_node_list", + "timeout_node_list", + "node_list", + # 3. 上下文和进程数据 + "context_value", + "context_outputs", + "process", + # 4. 周期任务和业务层数据 + "periodic_task_history", + "nodes_in_pipeline", + # 5. 快照和树信息 + "execution_snapshot", + "tree_info", + # 6. 最后处理实例和任务 + "tasks", + "pipeline_instances", + ] + + for field_name in delete_order: + if field_name not in data_to_clean: + continue + + qs_or_list = data_to_clean[field_name] + + # 处理列表类型(分批的QuerySet) + if field_name.endswith("_list") and isinstance(qs_or_list, list): + logger.info(f"[clean_expired_v2_task_data] clean field: {field_name}, " f"{len(qs_or_list)} batch data") + + # 🔧 修复: 使用循环而不是列表推导式,避免内存问题 + for idx, qs in enumerate(qs_or_list): + delete_with_retry(qs, f"{field_name}[{idx}]") + + # 处理pipeline_instances - 只标记过期,不删除 + elif field_name == "pipeline_instances": + updated_count = qs_or_list.update(is_expired=True) + logger.info(f"[clean_expired_v2_task_data] Updated {updated_count} pipeline_instances") + + # 处理需要删除的表 + elif field_name not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE: + logger.info(f"[clean_expired_v2_task_data] clean field: {field_name}") + delete_with_retry(qs_or_list, field_name) + + logger.info(f"[clean_expired_v2_task_data] Successfully cleaned batch: {task_ids}") + + @periodic_task(run_every=(crontab(*settings.ARCHIVE_EXPIRED_V2_TASK_CRON)), ignore_result=True, queue="task_data_clean") @time_record(logger) def archive_expired_v2_task_data(): From fa920c210793e7626f6673087af9b0ce413061df Mon Sep 17 00:00:00 2001 From: dengyh Date: Thu, 30 Oct 2025 11:02:55 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E6=B8=85?= =?UTF-8?q?=E7=90=86=E4=BB=BB=E5=8A=A1=E7=9A=84=E9=80=BB=E8=BE=91=20--stor?= =?UTF-8?q?y=3D149770577?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcloud/contrib/cleaner/tasks.py | 37 ++++++++++++--------------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/gcloud/contrib/cleaner/tasks.py b/gcloud/contrib/cleaner/tasks.py index e0e98365cf..c9701759f3 100644 --- a/gcloud/contrib/cleaner/tasks.py +++ b/gcloud/contrib/cleaner/tasks.py @@ -34,27 +34,17 @@ logger = logging.getLogger("root") -def delete_with_retry(queryset, field_name, max_retries=3, retry_delay=2): +def delete_records(queryset, field_name): """ 带重试机制的删除操作,处理Lock wait timeout """ - for attempt in range(max_retries): - try: - deleted_count = queryset.delete()[0] - logger.info(f"[clean_expired_v2_task_data] Deleted {deleted_count} records from {field_name}") - return deleted_count - except Exception as e: - error_msg = str(e) - if "Lock wait timeout exceeded" in error_msg or "Deadlock found" in error_msg: - if attempt < max_retries - 1: - logger.warning( - f"[clean_expired_v2_task_data] Lock timeout on {field_name}, " - f"retry {attempt + 1}/{max_retries} after {retry_delay}s" - ) - time.sleep(retry_delay) - continue - logger.error(f"[clean_expired_v2_task_data] Failed to delete {field_name}: {e}") - raise + try: + deleted_count = queryset.delete()[0] + logger.info(f"[clean_expired_v2_task_data] Deleted {deleted_count} records from {field_name}") + return deleted_count + except Exception as e: + error_msg = str(e) + logger.exception(f"[clean_expired_v2_task_data] Failed to delete {field_name}: {error_msg}") return 0 @@ -158,10 +148,9 @@ def clean_expired_v2_task_data(): 清除过期的任务数据 - 优化版本 优化点: - 1. 使用分布式锁,确保同一时间只有一个worker执行 - 2. 按依赖关系顺序删除,避免外键冲突 - 3. 分小批次处理,减少单次锁定的数据量 - 4. 添加重试机制,处理临时锁冲突 + 1. 按依赖关系顺序删除,避免外键冲突 + 2. 分小批次处理,减少单次锁定的数据量 + 3. 添加重试机制,处理临时锁冲突 """ if not settings.ENABLE_CLEAN_EXPIRED_V2_TASK: logger.info("Skip clean expired task data") @@ -270,7 +259,7 @@ def _clean_task_batch(pipeline_instance_ids, task_ids): # 🔧 修复: 使用循环而不是列表推导式,避免内存问题 for idx, qs in enumerate(qs_or_list): - delete_with_retry(qs, f"{field_name}[{idx}]") + delete_records(qs, f"{field_name}[{idx}]") # 处理pipeline_instances - 只标记过期,不删除 elif field_name == "pipeline_instances": @@ -280,7 +269,7 @@ def _clean_task_batch(pipeline_instance_ids, task_ids): # 处理需要删除的表 elif field_name not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE: logger.info(f"[clean_expired_v2_task_data] clean field: {field_name}") - delete_with_retry(qs_or_list, field_name) + delete_records(qs_or_list, field_name) logger.info(f"[clean_expired_v2_task_data] Successfully cleaned batch: {task_ids}")