diff --git a/ductor_bot/config.py b/ductor_bot/config.py index 4dfd4a1f..5af7b894 100644 --- a/ductor_bot/config.py +++ b/ductor_bot/config.py @@ -250,6 +250,8 @@ class TasksConfig(BaseModel): enabled: bool = True max_parallel: int = 5 timeout_seconds: float = 3600.0 + finished_retention_hours: int = 168 + finished_keep_last: int = 100 class TimeoutConfig(BaseModel): diff --git a/ductor_bot/tasks/hub.py b/ductor_bot/tasks/hub.py index 4e5cf736..39f0f3c1 100644 --- a/ductor_bot/tasks/hub.py +++ b/ductor_bot/tasks/hub.py @@ -416,19 +416,29 @@ async def shutdown(self) -> None: self._in_flight.clear() async def _maintenance_loop(self) -> None: - """Periodically clean orphaned task entries/folders (every 5 hours).""" + """Periodically clean orphaned task entries/folders and old finished tasks.""" try: while True: - await asyncio.sleep(_MAINTENANCE_INTERVAL) try: - removed = self._registry.cleanup_orphans() - if removed: - logger.info("Task maintenance: removed %d orphan(s)", removed) + self._run_maintenance_once() except Exception: logger.exception("Task maintenance failed (continuing)") + await asyncio.sleep(_MAINTENANCE_INTERVAL) except asyncio.CancelledError: pass + def _run_maintenance_once(self) -> None: + """Run one task registry maintenance pass.""" + removed = self._registry.cleanup_orphans() + if removed: + logger.info("Task maintenance: removed %d orphan(s)", removed) + pruned = self._registry.cleanup_finished_retention( + max_age_hours=self._config.finished_retention_hours, + keep_last=self._config.finished_keep_last, + ) + if pruned: + logger.info("Task maintenance: pruned %d finished task(s)", pruned) + async def _run( self, entry: TaskEntry, diff --git a/ductor_bot/tasks/registry.py b/ductor_bot/tasks/registry.py index 925f2a92..40b359c6 100644 --- a/ductor_bot/tasks/registry.py +++ b/ductor_bot/tasks/registry.py @@ -225,6 +225,51 @@ def cleanup_finished(self, chat_id: int | None = None) -> int: to_remove.append(task_id) return self._remove_entries(to_remove, "cleanup_finished") + def cleanup_finished_retention( + self, + *, + max_age_hours: int, + keep_last: int, + now: float | None = None, + ) -> int: + """Prune finished task history by age and count. + + Running/waiting tasks are never removed. ``completed_at`` is preferred + for ordering; older entries may lack it, so fall back to ``created_at``. + """ + if max_age_hours <= 0 and keep_last <= 0: + return 0 + + current_time = time.time() if now is None else now + cutoff = current_time - max_age_hours * 3600 if max_age_hours > 0 else None + + finished = [ + (task_id, entry) + for task_id, entry in self._entries.items() + if entry.status in _FINISHED_STATUSES + ] + to_remove: set[str] = set() + + if cutoff is not None: + for task_id, entry in finished: + if _finished_sort_time(entry) < cutoff: + to_remove.add(task_id) + + if keep_last > 0: + keep = { + task_id + for task_id, _ in sorted( + finished, + key=lambda item: _finished_sort_time(item[1]), + reverse=True, + )[:keep_last] + } + for task_id, _ in finished: + if task_id not in keep: + to_remove.add(task_id) + + return self._remove_entries(sorted(to_remove), "cleanup_finished_retention") + def _remove_entries(self, task_ids: list[str], label: str) -> int: """Delete entries and their folders from the registry.""" # Resolve folder paths before deleting entries (entries carry per-agent @@ -241,6 +286,11 @@ def _remove_entries(self, task_ids: list[str], label: str) -> int: return len(task_ids) +def _finished_sort_time(entry: TaskEntry) -> float: + """Return the timestamp used for finished task retention.""" + return entry.completed_at or entry.created_at + + # -- Task folder seeding ------------------------------------------------------- _TASK_RULES = """\ diff --git a/tests/tasks/test_hub.py b/tests/tasks/test_hub.py index 9d840ec8..750529ca 100644 --- a/tests/tasks/test_hub.py +++ b/tests/tasks/test_hub.py @@ -928,6 +928,43 @@ async def test_result_isolation_between_agents( await hub.shutdown() +class TestMaintenance: + async def test_maintenance_cleans_finished_retention( + self, registry: TaskRegistry, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Periodic maintenance should prune finished task history, not just orphans.""" + old_done = registry.create(_submit(name="old"), "claude", "opus") + recent_done = registry.create(_submit(name="recent"), "claude", "opus") + registry.update_status(old_done.task_id, "done") + registry.update_status(recent_done.task_id, "done") + old_done.completed_at = 1.0 + old_done.created_at = 1.0 + recent_done.completed_at = 10_000.0 + recent_done.created_at = 10_000.0 + registry._persist() + + async def fake_sleep(_: float) -> None: + raise asyncio.CancelledError + + monkeypatch.setattr("ductor_bot.tasks.hub.asyncio.sleep", fake_sleep) + monkeypatch.setattr("ductor_bot.tasks.registry.time.time", lambda: 10_000.0) + + hub = TaskHub( + registry, + MagicMock(workspace=tmp_path), + cli_service=_make_cli_service(), + config=_make_config( + finished_retention_hours=1, + finished_keep_last=100, + ), + ) + + await hub._maintenance_loop() + + assert registry.get(old_done.task_id) is None + assert registry.get(recent_done.task_id) is not None + + class TestAppendTaskmemory: """#91: _append_taskmemory must emit a WARNING log and include the original length + full file path in the suffix when truncation occurs. Without this, diff --git a/tests/tasks/test_registry.py b/tests/tasks/test_registry.py index 1bad8460..02629cda 100644 --- a/tests/tasks/test_registry.py +++ b/tests/tasks/test_registry.py @@ -160,6 +160,75 @@ def test_noop_when_empty(self, registry: TaskRegistry) -> None: assert registry.cleanup_finished() == 0 +class TestCleanupFinishedRetention: + def test_removes_finished_tasks_older_than_retention(self, registry: TaskRegistry) -> None: + old_done = registry.create(_submit(name="old"), "claude", "opus") + recent_done = registry.create(_submit(name="recent"), "claude", "opus") + running = registry.create(_submit(name="running"), "claude", "opus") + waiting = registry.create(_submit(name="waiting"), "claude", "opus") + + registry.update_status(old_done.task_id, "done") + registry.update_status(recent_done.task_id, "done") + registry.update_status(waiting.task_id, "waiting") + old_done.completed_at = 100.0 + old_done.created_at = 100.0 + recent_done.completed_at = 10_000.0 + recent_done.created_at = 10_000.0 + running.created_at = 100.0 + waiting.created_at = 100.0 + registry._persist() + + removed = registry.cleanup_finished_retention( + max_age_hours=1, + keep_last=100, + now=10_000.0, + ) + + assert removed == 1 + assert registry.get(old_done.task_id) is None + assert registry.get(recent_done.task_id) is not None + assert registry.get(running.task_id) is not None + assert registry.get(waiting.task_id) is not None + + def test_keeps_only_latest_finished_when_count_exceeds_limit( + self, registry: TaskRegistry + ) -> None: + entries = [registry.create(_submit(name=f"done-{i}"), "claude", "opus") for i in range(4)] + for i, entry in enumerate(entries): + registry.update_status(entry.task_id, "done") + entry.completed_at = float(i + 1) + entry.created_at = float(i + 1) + registry._persist() + + removed = registry.cleanup_finished_retention( + max_age_hours=24, + keep_last=2, + now=10.0, + ) + + assert removed == 2 + assert registry.get(entries[0].task_id) is None + assert registry.get(entries[1].task_id) is None + assert registry.get(entries[2].task_id) is not None + assert registry.get(entries[3].task_id) is not None + + def test_removes_finished_task_folder(self, registry: TaskRegistry) -> None: + entry = registry.create(_submit(name="old"), "claude", "opus") + folder = registry.task_folder(entry.task_id) + registry.update_status(entry.task_id, "failed") + entry.completed_at = 1.0 + registry._persist() + + removed = registry.cleanup_finished_retention( + max_age_hours=1, + keep_last=100, + now=10_000.0, + ) + + assert removed == 1 + assert not folder.exists() + + class TestDelete: def test_deletes_finished_task(self, registry: TaskRegistry, tmp_path: Path) -> None: entry = registry.create(_submit(name="Deletable"), "claude", "opus")