Skip to content

Add durable background jobs for memory and scanner work#180

Open
massy-o wants to merge 4 commits into
XortexAI:mainfrom
massy-o:durable-memory-jobs
Open

Add durable background jobs for memory and scanner work#180
massy-o wants to merge 4 commits into
XortexAI:mainfrom
massy-o:durable-memory-jobs

Conversation

@massy-o
Copy link
Copy Markdown

@massy-o massy-o commented May 14, 2026

Refs #162

Summary

  • add a MongoDB-backed durable job queue with idempotency keys, retries, timeouts, stale lease recovery, and dead-letter records
  • enqueue /v1/memory/ingest and /v1/memory/batch-ingest work and expose /v1/jobs/{job_id} for job status/results
  • route scanner start/resume work through the durable queue while preserving the existing scanner job/status records and falling back to in-process tasks if the job store is unavailable
  • add job worker settings for polling, timeout, retry, backoff, and lease duration

This is a focused Phase 1 implementation for the task-queue/status foundation described in the issue discussion.

Validation

  • python3 -m py_compile src/jobs.py src/api/routes/jobs.py src/api/routes/memory.py src/api/routes/scanner.py src/api/app.py src/api/schemas.py
  • git diff --check
  • uv run --with pytest --with pytest-asyncio --with fastapi --with pydantic --with pydantic-settings --with python-jose --with pymongo --with httpx --with beautifulsoup4 pytest tests/api/test_dependencies_and_routes.py -q -> 4 passed

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a durable background job system using MongoDB to handle long-running tasks in the memory and scanner modules. It adds a new job status endpoint, a worker for asynchronous execution, and relevant configuration settings. The review feedback primarily highlights the need to offload synchronous database operations to worker threads using asyncio.to_thread to avoid blocking the FastAPI event loop. Additionally, improvements were suggested for the stability of idempotency keys, the completeness of dead-letter logs, and the refactoring of duplicated user identification logic.

Comment thread src/api/routes/jobs.py
Comment thread src/api/routes/jobs.py Outdated
Comment thread src/api/routes/memory.py Outdated
Comment thread src/api/routes/memory.py Outdated
Comment thread src/api/routes/scanner.py Outdated
Comment thread src/jobs.py
Comment thread src/jobs.py Outdated
Comment thread src/jobs.py Outdated
Comment thread src/api/routes/jobs.py Outdated
@ishaanxgupta
Copy link
Copy Markdown
Member

Hi @massy-o , thank you for the contribution. the PR looks good to me, mostly I am concerned that if we need to make the changes in the /v1/memory routes or we could upgrade the versioning and make the changes in /v2/memory routes leaving the /v1/memory as it is. What do you think on this? Also let me know your thoughts on celery & redis, did you try out the ingest endpoint after job tracking improvement and notice the latency? has that increased?

@massy-o
Copy link
Copy Markdown
Author

massy-o commented May 16, 2026

Thanks @ishaanxgupta, that is a fair concern.

On the API versioning question: I agree that changing the response contract of /v1/memory/ingest is the riskiest part of this PR. Since the durable job path returns an enqueue/status response instead of the previous synchronous ingest result, my preference would be to keep /v1/memory backward-compatible and expose the async/job-tracked behavior under /v2/memory (or behind an explicit opt-in flag/header if you prefer a smaller surface). I am happy to adjust the PR in that direction so existing /v1 clients do not see a surprise contract change.

On Celery + Redis: I think that is a good production direction, especially once we want multiple worker processes, clearer operational controls, scheduling, and mature retry/dead-letter behavior. I kept this PR on the existing MongoDB dependency to make the first step smaller and avoid introducing Redis/Celery as new required infrastructure. The job store/worker boundary should also make it possible to swap the backend later without changing the route-level API much.

On ingest latency: I have not run a production-like benchmark yet, so I do not want to overstate the numbers. The intended effect is that the request path only persists the job record and returns the status URL, while the expensive ingest pipeline runs out of band. So the interactive request latency should generally decrease versus synchronous ingest, with the tradeoff that completion is now observed via polling. There is a small extra cost for the Mongo job insert/status tracking, but that should be much smaller than the embedding/judge/weaver work. If useful, I can add a lightweight timing note/test or run a before/after local measurement as part of the PR update.

So my proposed next step is move the async job-tracked ingest behavior to /v2/memory, keep /v1/memory synchronous/backward-compatible, and leave the current Mongo-backed queue as the minimal backend unless you want this PR to switch directly to Celery/Redis.

@ishaanxgupta
Copy link
Copy Markdown
Member

@massy-o yes that would be great, lets bring about these changes in /v2/memory and lets keep /v1/memory as it was. We can do the celery or redis integration in the next PR.
Could you please do a before/after local measurement and just share the results here in the comments itself.

Copy link
Copy Markdown
Collaborator

@Ankit-Kotnala Ankit-Kotnala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massy-o thanks for the update. I would hold off on merging this until the async job-tracked memory ingest path is moved to /v2/memory and /v1/memory/ingest + /v1/memory/batch-ingest remain backward-compatible.

One more concern: scanner jobs use a longer timeout, but stale lease recovery still uses the default lease window. With the current defaults, a long scanner job could be picked up again by another worker while it is still running. Can we align the lease with the job timeout or add lease renewal/heartbeat handling?

After that, please add tests for the v1/v2 behavior, job status/idempotency, retry/dead-letter flow, and scanner enqueue path, plus the before/after latency numbers requested above.

@github-actions github-actions Bot added the tests label May 17, 2026
@massy-o
Copy link
Copy Markdown
Author

massy-o commented May 17, 2026

Thanks @ishaanxgupta and @Ankit-Kotnala, I pushed the requested update in commit 5cb5118.

What changed:

  • Restored /v1/memory/ingest and /v1/memory/batch-ingest to the synchronous/backward-compatible response contract.
  • Added the async job-tracked paths under /v2/memory/ingest and /v2/memory/batch-ingest.
  • Kept the current Mongo-backed queue; Celery/Redis can stay as a follow-up PR.
  • Added per-job lease_seconds, defaulting to at least the job timeout, and explicitly aligned scanner jobs to their longer timeout window so long scanner jobs are not reclaimed by another worker while still running.
  • Added coverage for v1/v2 behavior, job status owner scoping, idempotency, dead-letter payload retention, and per-job lease handling.

Validation run locally:

  • uv run ruff check src/api/routes/memory.py src/api/app.py src/jobs.py src/api/routes/scanner.py tests/api/test_durable_jobs_routes.py -> passed
  • uv run pytest tests/api/test_dependencies_and_routes.py tests/api/test_durable_jobs_routes.py -> 10 passed

Before/after local measurement:
I used a small local route-level benchmark with the same FastAPI route stack, a fake ingest pipeline that sleeps for 50ms to represent synchronous ingest work, and an in-memory fake job store for enqueue overhead.

Results over 12 requests each:

  • /v1/memory/ingest synchronous path: mean 54.93ms, median 54.20ms, min 51.52ms, max 65.67ms
  • /v2/memory/ingest enqueue path: mean 1.47ms, median 1.19ms, min 0.96ms, max 2.88ms

So in this local synthetic measurement, the request return path is about 37.3x faster for the v2 enqueue path. This is not a production benchmark, but it confirms the intended behavior: v1 keeps the old synchronous contract, while v2 returns quickly after recording the job and leaves the expensive ingest work to the worker.

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 23, 2026

Greptile Summary

This PR introduces a MongoDB-backed durable job queue with idempotency keys, retry/backoff, stale-lease recovery, and dead-letter records, then routes /v2/memory/ingest, /v2/memory/batch-ingest, and scanner start/resume work through it while keeping the existing v1 synchronous paths intact.

  • src/jobs.py: New JobStore (Mongo CRUD + claim/retry/dead-letter) and JobWorker (single async polling loop with is_ready() gate); exposes init_jobs / shutdown_jobs wired into the FastAPI lifespan.
  • src/api/routes/scanner.py: _enqueue_or_start_scanner_job helper enqueues scanner work or falls back to asyncio.create_task; the GitHub PAT is included verbatim in the stored payload and will appear in dead-letter records on job failure.
  • src/api/routes/memory.py: New run_ingest_job / run_batch_ingest_job handlers and v2 enqueue endpoints; the ingest handler hard-codes a 120-second inner timeout and the batch timeout calculation has no upper bound.

Confidence Score: 3/5

Needs the PAT credential-at-rest issue resolved before merging; the rest of the queue implementation is structurally sound.

The job infrastructure itself is well-structured — idempotency, stale-lease recovery, dead-letter handling, and the is_ready gate all work correctly. However the scanner integration stores GitHub PATs verbatim in MongoDB job payloads, including dead-letter records, where they persist indefinitely and would be exposed in any database leak or accidental export. That credential-storage gap should be addressed before this ships to production. The remaining findings (broad exception catch, hardcoded inner timeout, unbounded batch timeout, client-side stale scan) are non-blocking quality concerns.

src/api/routes/scanner.py (PAT stored in job payload) and src/jobs.py (exception handling in enqueue, client-side stale-lease scan)

Security Review

  • Credential storage — src/api/routes/scanner.py: The GitHub Personal Access Token (PAT) supplied by the caller is written verbatim into the payload field of the jobs collection and, when a job exhausts its retries, is copied in full into the job_dead_letters collection. The _public_job helper strips the payload from API responses, but the credential persists unencrypted at rest in both collections. A MongoDB breach or accidental data export exposes every user's private-repo PAT.

Important Files Changed

Filename Overview
src/jobs.py New MongoDB-backed job queue and async worker; broad exception catch in enqueue can mask original errors, and stale-lease recovery fetches all running jobs client-side on every poll tick.
src/api/routes/scanner.py Scanner start/resume routed through durable queue, but the GitHub PAT is stored in plain text in the job payload and propagates to dead-letter records on failure.
src/api/routes/memory.py New v2 enqueue endpoints added alongside existing v1 sync routes; run_ingest_job hard-codes 120s timeout and batch timeout has no upper cap.
src/api/routes/jobs.py New /v1/jobs/{job_id} status endpoint; correctly scopes lookups to the authenticated owner and strips the payload from the response.
src/api/schemas.py Adds JobEnqueueResponse and JobStatusResponse Pydantic models; clean and complete.
src/config/settings.py Adds six new job-worker settings with sensible defaults; no issues found.
src/api/app.py Registers job handlers and routes in lifespan; init_jobs is correctly called before yield and shutdown_jobs before boot_task cleanup.
src/api/dependencies.py Adds get_owner_id() helper to centralise the username/name/id fallback chain; clean refactor.
tests/api/test_durable_jobs_routes.py Covers v1/v2 memory enqueue, job status ownership scoping, idempotency, dead-letter, and per-job lease tests; good coverage for the new paths.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as FastAPI Route
    participant Store as JobStore (MongoDB)
    participant Worker as JobWorker (async poll)
    participant Handler as Job Handler

    Client->>API: POST /v2/memory/ingest
    API->>Store: enqueue(job_type, owner_id, payload)
    Store-->>API: "{job_id, status: pending}"
    API-->>Client: "200 {job_id, status_url}"

    loop every job_poll_interval_seconds
        Worker->>Worker: is_ready()?
        Worker->>Store: claim_next(worker_id)
        Store-->>Worker: job doc (or stale lease recovery)
        Worker->>Handler: await handler(payload) [timeout]
        alt success
            Handler-->>Worker: result
            Worker->>Store: succeed(job_id, result)
        else failure
            Worker->>Store: fail_or_retry(job, error)
            Store-->>Store: retry (PENDING + backoff) or DEAD_LETTER
        end
    end

    Client->>API: "GET /v1/jobs/{job_id}"
    API->>Store: get(job_id, owner_id)
    Store-->>API: job doc (payload stripped)
    API-->>Client: JobStatusResponse
Loading

Fix All in Cursor Fix All in Codex Fix All in Claude Code

Reviews (1): Last reviewed commit: "Move async memory ingest to v2" | Re-trigger Greptile

Comment thread src/api/routes/scanner.py
Comment thread src/jobs.py
Comment thread src/api/routes/memory.py
Comment thread src/api/routes/memory.py Outdated
Comment thread src/jobs.py Outdated
@massy-o
Copy link
Copy Markdown
Author

massy-o commented May 23, 2026

Pushed commit 7dac71c addressing the latest Greptile findings:

  • Removed GitHub PAT from durable scanner job payloads/dead-letter path; scanner jobs now store only requires_pat and use a process-local credential reference, failing closed if the reference is gone.
  • Narrowed idempotent enqueue handling to DuplicateKeyError.
  • Replaced the hard-coded memory ingest timeout with settings.job_timeout_seconds.
  • Capped batch ingest timeout at settings.job_timeout_seconds * 10.
  • Added stale_at so stale lease recovery is filtered by MongoDB instead of scanning all running jobs client-side.
  • Added/updated tests for PAT non-persistence, fail-closed missing credential refs, and stale_at lease behavior.

Validation run locally:

uv run ruff check src/jobs.py src/api/routes/memory.py src/api/routes/scanner.py tests/api/test_durable_jobs_routes.py
All checks passed!

uv run pytest tests/api/test_dependencies_and_routes.py tests/api/test_durable_jobs_routes.py
12 passed, 4 warnings

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 23, 2026

Greptile encountered an error while reviewing this PR. Please reach out to support@greptile.com for assistance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants