Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ductor_bot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ class TasksConfig(BaseModel):
enabled: bool = True
max_parallel: int = 5
timeout_seconds: float = 3600.0
finished_retention_hours: int = 168
finished_keep_last: int = 100


class TimeoutConfig(BaseModel):
Expand Down
20 changes: 15 additions & 5 deletions ductor_bot/tasks/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,19 +416,29 @@ async def shutdown(self) -> None:
self._in_flight.clear()

async def _maintenance_loop(self) -> None:
"""Periodically clean orphaned task entries/folders (every 5 hours)."""
"""Periodically clean orphaned task entries/folders and old finished tasks."""
try:
while True:
await asyncio.sleep(_MAINTENANCE_INTERVAL)
try:
removed = self._registry.cleanup_orphans()
if removed:
logger.info("Task maintenance: removed %d orphan(s)", removed)
self._run_maintenance_once()
except Exception:
logger.exception("Task maintenance failed (continuing)")
await asyncio.sleep(_MAINTENANCE_INTERVAL)
except asyncio.CancelledError:
pass

def _run_maintenance_once(self) -> None:
"""Run one task registry maintenance pass."""
removed = self._registry.cleanup_orphans()
if removed:
logger.info("Task maintenance: removed %d orphan(s)", removed)
pruned = self._registry.cleanup_finished_retention(
max_age_hours=self._config.finished_retention_hours,
keep_last=self._config.finished_keep_last,
)
if pruned:
logger.info("Task maintenance: pruned %d finished task(s)", pruned)

async def _run(
self,
entry: TaskEntry,
Expand Down
50 changes: 50 additions & 0 deletions ductor_bot/tasks/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,51 @@ def cleanup_finished(self, chat_id: int | None = None) -> int:
to_remove.append(task_id)
return self._remove_entries(to_remove, "cleanup_finished")

def cleanup_finished_retention(
self,
*,
max_age_hours: int,
keep_last: int,
now: float | None = None,
) -> int:
"""Prune finished task history by age and count.

Running/waiting tasks are never removed. ``completed_at`` is preferred
for ordering; older entries may lack it, so fall back to ``created_at``.
"""
if max_age_hours <= 0 and keep_last <= 0:
return 0

current_time = time.time() if now is None else now
cutoff = current_time - max_age_hours * 3600 if max_age_hours > 0 else None

finished = [
(task_id, entry)
for task_id, entry in self._entries.items()
if entry.status in _FINISHED_STATUSES
]
to_remove: set[str] = set()

if cutoff is not None:
for task_id, entry in finished:
if _finished_sort_time(entry) < cutoff:
to_remove.add(task_id)

if keep_last > 0:
keep = {
task_id
for task_id, _ in sorted(
finished,
key=lambda item: _finished_sort_time(item[1]),
reverse=True,
)[:keep_last]
}
for task_id, _ in finished:
if task_id not in keep:
to_remove.add(task_id)

return self._remove_entries(sorted(to_remove), "cleanup_finished_retention")

def _remove_entries(self, task_ids: list[str], label: str) -> int:
"""Delete entries and their folders from the registry."""
# Resolve folder paths before deleting entries (entries carry per-agent
Expand All @@ -241,6 +286,11 @@ def _remove_entries(self, task_ids: list[str], label: str) -> int:
return len(task_ids)


def _finished_sort_time(entry: TaskEntry) -> float:
"""Return the timestamp used for finished task retention."""
return entry.completed_at or entry.created_at


# -- Task folder seeding -------------------------------------------------------

_TASK_RULES = """\
Expand Down
37 changes: 37 additions & 0 deletions tests/tasks/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,43 @@ async def test_result_isolation_between_agents(
await hub.shutdown()


class TestMaintenance:
async def test_maintenance_cleans_finished_retention(
self, registry: TaskRegistry, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Periodic maintenance should prune finished task history, not just orphans."""
old_done = registry.create(_submit(name="old"), "claude", "opus")
recent_done = registry.create(_submit(name="recent"), "claude", "opus")
registry.update_status(old_done.task_id, "done")
registry.update_status(recent_done.task_id, "done")
old_done.completed_at = 1.0
old_done.created_at = 1.0
recent_done.completed_at = 10_000.0
recent_done.created_at = 10_000.0
registry._persist()

async def fake_sleep(_: float) -> None:
raise asyncio.CancelledError

monkeypatch.setattr("ductor_bot.tasks.hub.asyncio.sleep", fake_sleep)
monkeypatch.setattr("ductor_bot.tasks.registry.time.time", lambda: 10_000.0)

hub = TaskHub(
registry,
MagicMock(workspace=tmp_path),
cli_service=_make_cli_service(),
config=_make_config(
finished_retention_hours=1,
finished_keep_last=100,
),
)

await hub._maintenance_loop()

assert registry.get(old_done.task_id) is None
assert registry.get(recent_done.task_id) is not None


class TestAppendTaskmemory:
"""#91: _append_taskmemory must emit a WARNING log and include the original
length + full file path in the suffix when truncation occurs. Without this,
Expand Down
69 changes: 69 additions & 0 deletions tests/tasks/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,75 @@ def test_noop_when_empty(self, registry: TaskRegistry) -> None:
assert registry.cleanup_finished() == 0


class TestCleanupFinishedRetention:
def test_removes_finished_tasks_older_than_retention(self, registry: TaskRegistry) -> None:
old_done = registry.create(_submit(name="old"), "claude", "opus")
recent_done = registry.create(_submit(name="recent"), "claude", "opus")
running = registry.create(_submit(name="running"), "claude", "opus")
waiting = registry.create(_submit(name="waiting"), "claude", "opus")

registry.update_status(old_done.task_id, "done")
registry.update_status(recent_done.task_id, "done")
registry.update_status(waiting.task_id, "waiting")
old_done.completed_at = 100.0
old_done.created_at = 100.0
recent_done.completed_at = 10_000.0
recent_done.created_at = 10_000.0
running.created_at = 100.0
waiting.created_at = 100.0
registry._persist()

removed = registry.cleanup_finished_retention(
max_age_hours=1,
keep_last=100,
now=10_000.0,
)

assert removed == 1
assert registry.get(old_done.task_id) is None
assert registry.get(recent_done.task_id) is not None
assert registry.get(running.task_id) is not None
assert registry.get(waiting.task_id) is not None

def test_keeps_only_latest_finished_when_count_exceeds_limit(
self, registry: TaskRegistry
) -> None:
entries = [registry.create(_submit(name=f"done-{i}"), "claude", "opus") for i in range(4)]
for i, entry in enumerate(entries):
registry.update_status(entry.task_id, "done")
entry.completed_at = float(i + 1)
entry.created_at = float(i + 1)
registry._persist()

removed = registry.cleanup_finished_retention(
max_age_hours=24,
keep_last=2,
now=10.0,
)

assert removed == 2
assert registry.get(entries[0].task_id) is None
assert registry.get(entries[1].task_id) is None
assert registry.get(entries[2].task_id) is not None
assert registry.get(entries[3].task_id) is not None

def test_removes_finished_task_folder(self, registry: TaskRegistry) -> None:
entry = registry.create(_submit(name="old"), "claude", "opus")
folder = registry.task_folder(entry.task_id)
registry.update_status(entry.task_id, "failed")
entry.completed_at = 1.0
registry._persist()

removed = registry.cleanup_finished_retention(
max_age_hours=1,
keep_last=100,
now=10_000.0,
)

assert removed == 1
assert not folder.exists()


class TestDelete:
def test_deletes_finished_task(self, registry: TaskRegistry, tmp_path: Path) -> None:
entry = registry.create(_submit(name="Deletable"), "claude", "opus")
Expand Down