Skip to content

文档重处理(重新分类 / 字段重抽)批量机制 —— 设计与实施方案 #289

@duguankui

Description

@duguankui

背景与目标

配置(分类提示词 / 字段定义)被调整后,存量文档需要重新跑相应的处理步骤。本 Issue 记录从"详情页『重新识别』是什么"的疑问出发、到一套完整重处理机制的思考演化,并给出实施步骤,供立项与实现参考。

状态:设计已讨论定型,尚未实现。涉及 pipeline 体系 / 字段架构 / 出口事件三条通道边界,按约定在本 Issue 定稿后再动手。

全景速览

文档处理流水线(正向):上传 → text-extraction(OCR/解析→Markdown)→ classification(判类型)→ field-extraction(抽字段)→ Ready

每个环节都可能因配置调整而需要对存量重处理。越上游,级联越广、越危险、成本越高

环节 触发场景 重处理动作 级联 危险度/成本 本文
text-extraction OCR 差 / 换 provider 重新 OCR 级联分类+字段(Markdown 变,下游全变) 最高 占位
classification 改类型描述/阈值/优先级、增删类型 重新分类 级联字段(类型可能变) 中高(覆盖人工确认/打回审核/清字段) 场景一
field-extraction 改字段定义 字段重抽 无(叶子) 场景二

三场景共享同一执行底座(人工触发 + 两层链式分发 + Hangfire 队列 + 幂等续跑 + 生命周期中性),区别只在跑哪个 pipeline、范围、级联深度、警告严重度。

一句话:给两个 LLM 环节(分类、字段抽取)各加一个人工触发、批量、可断点续跑的重处理入口,共用一套不依赖专门状态表的执行底座。

设计演化简史

  1. 起点:从"详情页『重新识别』是否做字段重抽、是否含图片 PDF 识别"的疑问开始。查证:重新识别 = 重排分类 + 级联字段抽取、不含 OCR;且字段抽取不是 pipeline,是分类完成事件的 EventHandler,引擎 FieldExtractionWorkflow 是纯函数、本就与分类解耦。
  2. 判断归人:抛弃"系统按改动类型自动判断要不要重处理"——系统看不到意图,且配置调整常是多步事务,只有人知道"调完了"的时刻。确立:变更零级联,重处理由人工触发。
  3. 独立路径:字段重抽绝不复用"重新识别"——后者重排分类、有破坏性副作用。
  4. 续跑简化:从"批次表 + 工作项表"砍到"靠后台队列持久化 + 单篇幂等",零专门状态表
  5. 基础设施:host 当前用 ABP 默认后台任务管理器(单线程串行);生产换 Hangfire 拿并发,是 host 部署层的事、core 不动。
  6. 分发机制:抛弃"请求内同步撒 N 个任务""单个长跑 dispatcher""一次性全量读";留下两层任务 + keyset 分页只读 Id + 链式自延续
  7. 扩展到重新分类:抽象成三场景共用底座,补上"重新分类"场景(级联、破坏性、范围、幂等确定性四点不对称)。
  8. 范围归人纠偏:重新分类范围一度被设成"系统默认已归该类型 + 全量倾向不做"——违反"判断归人"且漏掉"纳入原本不属于的文档 / 新增类型生效必须全量"这一硬约束。改为:范围由人按意图选、系统不预设默认;全量不再因成本被否决(批量底座已能扛),真正要兜的是破坏性。

一、起点与共同原则

关键事实(决定可行性)

  • 字段抽取不是流水线:系统只有 text-extractionclassification 两条 pipeline。字段抽取是订阅 DocumentClassifiedEtoFieldExtractionEventHandler(事件级联),引擎 FieldExtractionWorkflow.ExtractAsync(descriptors, markdown)纯函数、不依赖分类。
  • "分类提示词"指什么:分类 prompt = 编译期常量 system prompt(admin 改不了)+ 候选 DocumentTypeTypeCode + DisplayName + Description(注释明写"也用作 LLM 分类提示")。所以"调分类提示词" = 改 DocumentType.Description(或 ConfidenceThreshold / Priority / 增删类型)。
  • "重新识别"(RerecognizeAsyncfeat(documents): 文档详情页增加「重新识别」入口(重跑 AI 自动分类 → 级联字段重抽) #263)= 重排 classification + 级联字段抽取,不重新 OCR,是"重新分类"的单篇版。

共同原则:判断归人

抛弃:让系统按"改动类型"自动判断要不要重处理(曾想列"新增字段→重抽、改描述→重分"的映射表)。

为什么抛弃:这是让系统替人猜意图,而意图系统看不到——新增字段可能只想对新文档生效;删字段可能是"拆字段"重构要重抽;改描述可能是纠正误收也可能是捞回漏分。更强的论据是时机的事务性:配置调整常是一组操作(拆字段 = 删+增+改描述),只有整组调完再重处理才对,而"什么时候算调完了"只有人知道。

留下

系统不判断"要不要重处理",配置变更时零级联(保持现状)。重处理是独立的、人工随时发起的动作——"要不要做、对哪些做、从哪一步做、范围多大"全部归人。

二、三场景的关系

越上游越重:text-extraction → classification → field-extraction 是依赖链,重处理上游会级联其所有下游。

包含关系:重新分类 ⊇ 字段重抽——重新分类只要成功就发 DocumentClassifiedEto,必然连带字段重抽(哪怕类型没变);字段重抽只动叶子。按"改了什么"选:

你改了 用哪个 为什么
只改字段定义 字段重抽 省下分类成本 + 避开分类的破坏性
改分类提示词 / 增删类型 重新分类 它会把字段也一并刷新
两者都改 重新分类 一次覆盖两件事

这是两场景并存而非合一的理由:字段重抽是安全叶子操作,重新分类是有破坏性的级联操作。

三、通用执行底座(三场景共用)

1. 人工触发 + 批量预览:入口在配置所在页 → 预览模态(受影响文档数、类型、将重跑内容、范围选择〔重新分类〕、警告)→ 确认 → "已进后台"。

2. 后台执行:host 当前 AbpBackgroundJobsEntityFrameworkCoreModule、无 Hangfire、单线程串行(大批量瓶颈)。生产换 Hangfire 拿并发——host 部署层的事、core 不动。换并发后要正视:并发上限(别打爆 LLM provider)、DB 连接池、外部 LLM 调用绝不包在长 UoW 里。

3. 零专门状态表的断点续跑

  • 单篇操作幂等(字段重抽 SetFields 整组替换 / 重新分类 SetClassification 覆盖式)——重跑不累加、不脏数据;
  • 队列存储就是工作清单(成功的已移除、未完成的还在);
  • Hangfire 语义:成功态永不重跑;at-least-once(崩溃瞬间在跑的那个会被重跑一遍,单任务异常自动重试默认 10 次后落 Failed);这个"重跑一次"无害——正因为单篇幂等,幂等是整套方案的基石。
  • 抛弃:① 线性游标(脆弱);② 专门批次表 + 工作项表(队列本身就是工作清单,进度精确数字非刚需)。续跑从"工作项状态"思考而非游标;进度若要是实时聚合查询而非冗余计数器。

4. 分发机制:两层任务 + keyset 分页 + 链式自延续

  • 抛弃"请求内同步撒 N 个任务":大批量超时、且撒到一半崩了剩下丢失(投递无断点)。
  • 两层任务:第 1 层一个 dispatcher(点按钮只 enqueue 它,立即返回);第 2 层 N 个单篇任务(dispatcher 在后台枚举范围、分批入队,这 N 个并发执行、各自幂等可重跑)。
  • 分发任务读文档:抛弃一次性全读(连接/事务占太久、易超时)与逐条读(太碎);留下 keyset 游标分页(每批 WHERE Id > 上批末Id ORDER BY Id Take(N),走索引、O(batch)、优于 OFFSET);只读 Id.Select(d => d.Id).AsNoTracking(),绝不读整行尤其 Markdown,否则 OOM);每批一个短 UoW。
  • 分发任务粒度:抛弃"单个长跑 dispatcher",留下"链式自延续"——长跑 dispatcher 长占 worker 槽位、崩溃重跑代价大。改为每次只处理一批:读一批 Id → enqueue 这批单篇任务 → enqueue 下一个 dispatcher(带游标 lastId,可加 delay 背压)→ 自己结束。每个 dispatcher 都是几秒短任务,系统里只剩一种粒度;游标随任务参数持久化、断点只重当前批、不长占 worker、可背压。
  • 诚实的代价:链式 at-least-once 重跑可能"分叉"(某 dispatcher 在"已 enqueue 下一个"之后、"标记成功"之前崩了,重跑会再 enqueue 一个下一个,导致从该点往下双倍 enqueue)。结果仍正确(单篇幂等),最坏多花成本。可接受。

5. 生命周期中性:重处理不改变文档宏观状态、不动 Ready 闸门。字段抽取今天天然中性(不是 pipeline);若提升为 pipeline 须排除在 KeyPipelines / DeriveLifecycle 外。重新分类走 classification(本来就是 key pipeline),要避免把已 Ready 文档不必要打回 Processing。

四、场景一:重新分类

触发:改 DocumentType.Description / ConfidenceThreshold / Priority,或增删类型。
动作:重排已存在的 classification pipeline(复用,不新建;单篇跑 DocumentClassificationWorkflow)。

四个不对称点(vs 字段重抽):① 级联(成功必连带字段重抽);② 破坏性强(覆盖人工确认的类型、低置信度打回待审核 + 清字段);③ 范围语义难界定(分类是候选集竞争,改一个类型影响所有文档对它的判定);④ 幂等确定性弱(状态幂等、续跑安全,但 LLM 结果有随机性,重跑可能判出不同类型)。

设计决策 — 保护人工确认(默认开启):人工确认是比自动分类更高优先级的信号。

默认跳过已人工确认的文档;提供显式开关"连人工确认的也重分",带重度警告。把"覆盖人工成果"变成显式 opt-in。

设计决策 — 范围(由人按意图选,系统不预设默认):分类是"候选集竞争选一个",改一个类型的描述等于改了整个分类函数,任何文档的判定都可能变:

范围 能做到 代价 / 破坏面
仅已归为该类型 踢出不再符合的(局部修正) 最便宜、破坏面局限;死角:捞不回"本该归此类、却被分到别处"的
全量 / 跨类型 既踢出又纳入;新增类型、归拢散落同类的唯一可行范围 最贵、破坏面最大(所有文档重新竞争,无关文档可能被随机性波及);靠"保护人工确认 + 重警告"兜
待审核队列 捞回之前没分成功的 范围小、安全

为什么不预设默认:与"判断归人"一致。关键纠偏——"纳入原本不属于该类型的文档"只有扩大到全量/跨类型才能实现;新增类型因没有任何"已归"文档,只能用全量。全量成本不再是否决理由(批量底座已能扛 + 断点续跑,花多少钱是人知情决策),真正要兜的是破坏性。最稳起手是"仅已归为该类型",但须告知"要纳入新文档/让新类型生效得选全量"。

入口:文档类型的分类设置页。模态警告(重):受影响数 + "覆盖自动分类""低置信度打回待审核并清字段""默认跳过人工确认" + 范围选择 + LLM 成本。

五、场景二:字段重抽

触发:改字段定义(Description / 增删字段;纯展示改动如 DisplayName 不需重抽)。
动作:独立于分类、只跑字段抽取,绝不复用"重新识别";复用 FieldExtractionWorkflow

形态:① 抽出可复用抽取引擎(把 FieldExtractionEventHandler 里"读定义→workflow→守卫→SetFields→发 FieldsExtractedEto"提炼成共享单元,事件处理器保留其 reclassify race 守卫并委托);② field-extraction 提升为独立 pipeline 但生命周期中性,复用 DocumentPipelineRun 拿可观测;③ 新 AppService:预览 / 批量触发 / 单篇"仅重抽字段"(详情页一个区别于"重新识别"的按钮,免费附带)。

范围DocumentTypeId = 该类型 的文档(干净明确)。入口:字段定义页 / 类型菜单为主,列表批量为后续。模态警告(轻):受影响数 + "覆盖含人工校正的字段值" + LLM 成本。

六、场景三(占位):重新 OCR

最上游、级联一切、成本最高。现状只有 Failed 的 text-extraction run 能 RetryPipeline,成功的无批量重 OCR 入口。本期不展开——触发场景最少(provider 升级低频)、代价最高,等真有"换 OCR provider 回填存量"需求再设计。套用同一底座,单篇跑 text-extraction、级联深度最大。

实施步骤(建议顺序,每步可独立提交 / 验证)

  1. 重构:抽出共享字段抽取执行单元 —— 把 FieldExtractionEventHandler 里"读字段定义 → FieldExtractionWorkflow.ExtractAsync → in-flight 守卫 → Document.SetFields → 发 FieldsExtractedEto"提炼为可复用 service;事件处理器保留 reclassify race / 跨租户守卫并委托给它。纯重构、行为不变,先用测试护住。
  2. 新增 field-extraction pipeline(生命周期中性) —— PaperbasePipelines 加常量(KeyPipelines);新 JobArgs + BackgroundJob(复用 DocumentPipelineBackgroundJobBase + 步骤 1 共享单元);DocumentPipelineJobScheduler switch 加 case;确认/调整 DeriveLifecycle 对该非 key pipeline 不改 LifecycleStatus
  3. 字段重抽 AppService + Controller —— 预览(按 DocumentTypeId count + 当前字段清单)、批量触发(enqueue dispatcher)、单篇"仅重抽字段"。手写 HttpApi controller(避免 refactor(pipelines): elevate DocumentPipelineRun to independent aggregate root #216 那种 Auto API 漏配导致运行期 404)。
  4. dispatcher(分发任务) —— 链式自延续:keyset 分页只读 Id(.Select(Id).AsNoTracking())、每批短 UoW、enqueue 这批单篇任务、enqueue 下一个 dispatcher(带游标,可 delay 背压);幂等判据用文档自身状态。
  5. 重新分类批量 —— 复用 classification pipeline;新增"批量重新分类"AppService(范围选择:仅已归该类型 / 全量·跨类型 / 待审核队列;保护人工确认开关)+ 对应 dispatcher(enqueue classification)。
  6. 前端 —— 字段定义页/类型菜单的"重抽字段"入口 + 预览模态(轻警告);类型分类设置页的"重新分类"入口 + 范围选择 + 重警告(覆盖自动分类/打回审核/默认跳过人工确认)。重新生成 proxy。
  7. host 部署层 —— 生产引入 Hangfire(并发 worker + 持久化续跑);core 不动。
  8. 测试 —— 单篇幂等、dispatcher 崩溃续跑、生命周期中性(Ready 文档不被打回)、范围查询正确、保护人工确认生效、HttpApi/MCP schema 守护。

贯穿的设计原则

  1. 判断归人——系统不替人猜"要不要重处理、范围多大"。
  2. 复用既有基础设施不自造——FieldExtractionWorkflow / DocumentClassificationWorkflow 引擎、classification pipeline、DocumentPipelineRun、Hangfire。
  3. 幂等是基石——SetFields / SetClassification 覆盖式幂等是"at-least-once + 续跑 + 重复投递无害"的根本。
  4. 零专门状态表——续跑/进度/游标全从既有事实源(队列状态、文档自身状态、任务参数)涌现。
  5. 生命周期中性——不改宏观状态、不动 Ready 闸门。
  6. 通道哲学——批次/进度是内部运维状态,不进出口契约;下游只消费照常发的 ETO(按 EventTime 幂等)。
  7. 粒度统一——连分发也切成短任务,系统里无长跑大任务。
  8. 越上游越重——上游重处理级联所有下游,危险度/警告随之加重。
  9. 降范围 ≠ 有损默认——为省成本把"全量"贬为二等会牺牲正确性;成本是人知情决策,不由系统替人砍掉完整性。

待定 / 未决问题(实现前需拍板)

  • 重新分类范围 UI 形态:三范围由人选、不预设默认;模态如何呈现、如何与"保护人工确认"开关组合。全量已确认要做(新增类型/归拢同类唯一可行),不再是"是否做"。
  • 重新分类对 Ready 文档的处理classification 是 key pipeline,如何避免把已 Ready 文档不必要打回 Processing。
  • 入口最终形态:先只做配置页入口,还是同时做文档列表批量勾选(类型+时间段+指定若干篇)。
  • 重复触发是否拦截:同范围已有一批在跑时又点一次会撒重复一批(幂等但白烧成本)。
  • field-extraction 建模:正式进 PaperbasePipelines(享可观测+重试)vs 更轻的后台任务。倾向前者。
  • 分发任务幂等判据:靠"文档自身状态"跳过已处理的——用哪种信号(最近一次成功 run 的存在性 / 时间戳晚于本批开始)。
  • 进度可见性:精确进度非刚需;若要靠每篇 run 聚合查询,仍不建批次表。
  • OCR 重处理(场景三):暂不做。

涉及的通道边界(实现前应定稿)

  • pipeline 体系:新增 field-extraction 流水线 + 生命周期中性约束;复用 classification 做批量重新分类。
  • 字段架构 / 分类机制:抽取引擎从事件级联提炼为可独立触发的共享单元;重新分类的范围与人工确认保护语义。
  • 出口事件:批量重处理会批量发 DocumentClassifiedEto / FieldsExtractedEto(下游按 (DocumentId, EventType, EventTime) 幂等吸收,机制已有)。

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:ai-workflowAI workflow / MAF Workflow relatedseverity:designDesign improvement (not a hard bug)

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions