Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
from src.api.routes.code import router as code_router
from src.api.routes.enterprise import router as enterprise_router
from src.api.routes.health import router as health_router
from src.api.routes.jobs import router as jobs_router
from src.api.routes.memory import router as memory_router
from src.api.routes.memory import run_batch_ingest_job, run_ingest_job
from src.api.routes.memory import scrape_router as memory_scrape_router
from src.api.routes.memory_graph import router as memory_graph_router
from src.api.routes.scanner import router as scanner_router
from src.api.routes.scanner import run_scanner_job, run_scanner_phase2_job
from src.api.routes.telemetry import router as telemetry_router
from src.api.schemas import APIResponse, StatusEnum
from src.config import settings
Expand Down Expand Up @@ -99,7 +102,16 @@ async def lifespan(app: FastAPI):
_set_event_loop(asyncio.get_running_loop())

boot_task = asyncio.create_task(_boot_pipelines())
from src.jobs import init_jobs
init_jobs({
"memory.ingest": run_ingest_job,
"memory.batch_ingest": run_batch_ingest_job,
"scanner.scan": run_scanner_job,
"scanner.phase2": run_scanner_phase2_job,
})
yield
from src.jobs import shutdown_jobs
await shutdown_jobs()
await boot_task
from src.api.dependencies import _ingest_pipeline, _retrieval_pipeline
if _ingest_pipeline:
Expand Down Expand Up @@ -156,6 +168,7 @@ async def lifespan(app: FastAPI):
app.include_router(health_router)
app.include_router(memory_scrape_router)
app.include_router(memory_router)
app.include_router(jobs_router)
app.include_router(memory_graph_router)
app.include_router(code_router)
app.include_router(scanner_router)
Expand Down
46 changes: 46 additions & 0 deletions src/api/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Background job status routes."""

from __future__ import annotations

import time
Comment thread
massy-o marked this conversation as resolved.

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse

from src.api.dependencies import enforce_rate_limit, require_api_key
from src.api.schemas import APIResponse, JobStatusResponse, StatusEnum
from src.jobs import get_job_store, serialize_job

router = APIRouter(
prefix="/v1/jobs",
tags=["jobs"],
dependencies=[Depends(enforce_rate_limit)],
)


@router.get("/{job_id}", summary="Get durable background job status")
async def get_job_status(
job_id: str,
request: Request,
user: dict = Depends(require_api_key),
):
start = time.perf_counter()
owner_id = user.get("username") or user.get("name") or user["id"]
Comment thread
massy-o marked this conversation as resolved.
Outdated
try:
store = get_job_store()
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc

job = store.get(job_id, owner_id=owner_id)
Comment thread
massy-o marked this conversation as resolved.
Outdated
if not job:
raise HTTPException(status_code=404, detail="Job not found.")

data = JobStatusResponse(**serialize_job(job))
elapsed = round((time.perf_counter() - start) * 1000, 2)
body = APIResponse(
status=StatusEnum.OK,
request_id=getattr(request.state, "request_id", None),
data=data.model_dump(mode="json"),
elapsed_ms=elapsed,
)
return JSONResponse(content=body.model_dump(mode="json"))
90 changes: 82 additions & 8 deletions src/api/routes/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DomainResult,
IngestRequest,
IngestResponse,
JobEnqueueResponse,
OperationDetail,
RetrieveRequest,
RetrieveResponse,
Expand All @@ -40,6 +41,7 @@
StatusEnum,
WeaverSummary,
)
from src.jobs import get_job_store
from src.pipelines.retrieval import RetrievalPipeline

from bs4 import BeautifulSoup
Expand Down Expand Up @@ -108,6 +110,17 @@ def _error(request: Request, detail: str, code: int, elapsed_ms: float = 0) -> J
return JSONResponse(content=body.model_dump(), status_code=code)


def _job_enqueued_response(request: Request, job: Dict[str, Any], elapsed_ms: float) -> JSONResponse:
data = JobEnqueueResponse(
job_id=job["job_id"],
job_type=job["job_type"],
status=job["status"],
idempotency_key=job["idempotency_key"],
status_url=f"/v1/jobs/{job['job_id']}",
)
return _wrap(request, data, elapsed_ms)


def _detect_chat_provider(*urls: str) -> str:
for url in urls:
lowered = (url or "").lower()
Expand Down Expand Up @@ -140,6 +153,37 @@ async def _render_chat_share(url: str) -> tuple[str, str]:
return await asyncio.to_thread(_render_chat_share_sync, url)


async def run_ingest_job(payload: Dict[str, Any]) -> Dict[str, Any]:
pipeline = get_ingest_pipeline()
result = await asyncio.wait_for(
pipeline.run(
user_query=payload["user_query"],
agent_response=payload.get("agent_response") or "Acknowledged.",
user_id=payload["user_id"],
session_datetime=payload.get("session_datetime", ""),
image_url=payload.get("image_url", ""),
effort_level=payload.get("effort_level", "low"),
),
timeout=120.0,
)
data = IngestResponse(
model=_model_name(pipeline.model),
classification=_safe_classifications(result),
profile=_build_domain_result(result.get("profile_judge"), result.get("profile_weaver")),
temporal=_build_domain_result(result.get("temporal_judge"), result.get("temporal_weaver")),
summary=_build_domain_result(result.get("summary_judge"), result.get("summary_weaver")),
image=_build_domain_result(result.get("image_judge"), result.get("image_weaver")),
)
return data.model_dump()


async def run_batch_ingest_job(payload: Dict[str, Any]) -> Dict[str, Any]:
results = []
for item in payload["items"]:
results.append(await run_ingest_job({**item, "user_id": payload["user_id"]}))
return BatchIngestResponse(results=[IngestResponse(**result) for result in results]).model_dump()


# ── Warm browser pool ──────────────────────────────────────────────────────
# Launching Chromium from cold takes 3-5s. We keep a singleton alive and
# reuse it across scrape requests. The browser is thread-safe when each
Expand Down Expand Up @@ -558,18 +602,31 @@ async def ingest_memory(req: IngestRequest, request: Request, user: dict = Depen

# Get username from authenticated user
user_id = user.get("username") or user.get("name") or user["id"]
payload = {
"user_query": req.user_query,
"agent_response": req.agent_response or "Acknowledged.",
"user_id": user_id,
"session_datetime": req.session_datetime,
"image_url": req.image_url,
"effort_level": req.effort_level,
}

try:
store = get_job_store()
job = store.enqueue(
job_type="memory.ingest",
owner_id=user_id,
payload=payload,
)
Comment thread
massy-o marked this conversation as resolved.
Outdated
elapsed = round((time.perf_counter() - start) * 1000, 2)
return _job_enqueued_response(request, job, elapsed)
except RuntimeError:
logger.warning("Job store unavailable; falling back to synchronous ingest")

try:
async with _ingest_semaphore:
result = await asyncio.wait_for(
pipeline.run(
user_query=req.user_query,
agent_response=req.agent_response or "Acknowledged.",
user_id=user_id,
session_datetime=req.session_datetime,
image_url=req.image_url,
effort_level=req.effort_level,
),
pipeline.run(**payload),
timeout=120.0
)
data = IngestResponse(
Expand Down Expand Up @@ -606,6 +663,23 @@ async def batch_ingest_memory(req: BatchIngestRequest, request: Request, user: d
start = time.perf_counter()
pipeline = get_ingest_pipeline()
user_id = user.get("username") or user.get("name") or user["id"]
payload = {
"user_id": user_id,
"items": [item.model_dump() for item in req.items],
}

try:
store = get_job_store()
job = store.enqueue(
job_type="memory.batch_ingest",
owner_id=user_id,
payload=payload,
timeout_seconds=120.0 * max(len(req.items), 1),
)
Comment thread
massy-o marked this conversation as resolved.
Outdated
elapsed = round((time.perf_counter() - start) * 1000, 2)
return _job_enqueued_response(request, job, elapsed)
except RuntimeError:
logger.warning("Job store unavailable; falling back to synchronous batch ingest")

results = []

Expand Down
Loading
Loading