-
Notifications
You must be signed in to change notification settings - Fork 327
docs: add async service ingestor examples #2182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,126 @@ | ||
| # NeMo Retriever API Reference | ||
|
|
||
| ## Service-mode async ingest jobs | ||
|
|
||
| Use `create_ingestor(run_mode="service")` when you want the Python client to submit | ||
| documents to a running NeMo Retriever service instead of executing the pipeline in | ||
| the local process or Ray cluster. Service mode exposes three ingest surfaces: | ||
|
|
||
| - `ingest()` blocks until all submitted documents finish and returns a | ||
| `ServiceIngestResult` with per-document completion events, `job_id`, | ||
| `document_ids`, `failures`, `job_status`, `elapsed_s`, and, by default, a | ||
| combined `dataframe` of completed results. | ||
| - `ingest_stream()` is a synchronous generator for callers that want progress | ||
| events without managing an event loop. | ||
| - `aingest_stream()` is an asynchronous generator for applications that already | ||
| run inside an event loop. | ||
|
|
||
| ### Synchronous streaming example | ||
|
|
||
| ```python | ||
| from nemo_retriever import create_ingestor | ||
|
|
||
|
|
||
| ingestor = ( | ||
| create_ingestor( | ||
| run_mode="service", | ||
| base_url="http://localhost:7670", | ||
| documents=["docs/alpha.pdf", "docs/beta.pdf"], | ||
| ) | ||
| .extract() | ||
| .embed() | ||
| ) | ||
|
|
||
| for event in ingestor.ingest_stream(): | ||
| match event["event"]: | ||
| case "job_created": | ||
| print( | ||
| f"created job {event['job_id']} " | ||
| f"for {event['expected_documents']} documents" | ||
| ) | ||
| case "upload_complete": | ||
| print(f"uploaded {event['filename']} as {event['document_id']}") | ||
| case "upload_failed": | ||
| print(f"could not upload {event['filename']}: {event['error']}") | ||
| case "document_complete": | ||
| if event["status"] == "completed": | ||
| print(f"{event['document_id']} completed with {event['result_rows']} rows") | ||
| else: | ||
| print(f"{event['document_id']} failed: {event.get('error')}") | ||
| case "job_progress": | ||
| print(f"{event['completed']} completed, {event['failed']} failed") | ||
| case "job_finalized" | "job_partial" | "job_failed": | ||
| print(f"job {event['job_id']} finished with status {event['status']}") | ||
| ``` | ||
|
|
||
| Use this form from scripts, notebooks, CLI commands, or worker processes that are | ||
| otherwise synchronous but still need live job and document progress. | ||
|
|
||
| ### Async streaming example | ||
|
|
||
| ```python | ||
| import asyncio | ||
|
|
||
| from nemo_retriever import create_ingestor | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| ingestor = ( | ||
| create_ingestor( | ||
| run_mode="service", | ||
| base_url="http://localhost:7670", | ||
| documents=["docs/alpha.pdf", "docs/beta.pdf"], | ||
| ) | ||
| .extract() | ||
| .embed() | ||
| ) | ||
|
|
||
| async for event in ingestor.aingest_stream(): | ||
| if event["event"] == "job_created": | ||
| print(f"created job {event['job_id']}") | ||
| elif event["event"] == "upload_complete": | ||
| print(f"uploaded {event['filename']}") | ||
| elif event["event"] == "upload_failed": | ||
| print(f"could not upload {event['filename']}: {event['error']}") | ||
| elif event["event"] == "document_complete": | ||
| print(f"{event['document_id']}: {event['status']}") | ||
| elif event["event"] in {"job_finalized", "job_partial", "job_failed"}: | ||
| print(f"job {event['job_id']}: {event['status']}") | ||
|
|
||
|
|
||
| asyncio.run(main()) | ||
| ``` | ||
|
|
||
| Use this form from async web services, task runners, or notebooks that need to | ||
| keep other async work moving while ingestion is in flight. | ||
|
|
||
| ### Event shapes | ||
|
|
||
| The streaming APIs yield dictionaries. Check the `event` key first, then read the | ||
| fields that apply to that event type: | ||
|
|
||
| | Event | Meaning | Key fields | | ||
| | --- | --- | --- | | ||
| | `job_created` | The service created one aggregate job for the submitted document set. | `job_id`, `expected_documents` | | ||
| | `upload_complete` | One local file uploaded and was assigned a service document ID. | `job_id`, `filename`, `document_id` | | ||
| | `document_complete` | One document reached a terminal document state. | `job_id`, `document_id`, `status`, `result_rows`, `elapsed_s`, `error` | | ||
| | `upload_failed` | One local file could not be uploaded. | `job_id`, `filename`, `error` | | ||
| | `job_started` | At least one document in the job started processing. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `started_at` | | ||
| | `job_progress` | The job reached a progress reporting milestone. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s` | | ||
| | `job_finalized` | All documents completed successfully. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` | | ||
| | `job_partial` | Some documents completed and some failed. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` | | ||
| | `job_failed` | Every document in the job failed. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` | | ||
|
|
||
|
Comment on lines
+101
to
+119
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The event shapes table lists Prompt To Fix With AIThis is a comment left during a code review.
Path: docs/docs/extraction/nemo-retriever-api-reference.md
Line: 97-113
Comment:
**`job_started` event documented in the table but absent from both code examples**
The event shapes table lists `job_started` with its full set of key fields (`job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `started_at`), but neither the synchronous nor the asynchronous code example handles it. Readers who copy the examples verbatim will silently drop `job_started` events. A catch-all (`case _: ...` / `else: ...`) branch or a brief note clarifying that `job_started` does not require a handler in typical use would remove the ambiguity.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
| `document_complete` uses `status="completed"` or `status="failed"`. Job terminal | ||
| events use aggregate statuses: `job_finalized` reports `status="completed"`, | ||
| `job_partial` reports `status="partial_success"`, and `job_failed` reports | ||
| `status="failed"`. | ||
|
|
||
| Use `ingest(return_results=False)` when you only need the final job metadata and | ||
| document IDs. The default `ingest()` behavior fetches result rows for each | ||
| completed document so it can populate `result.dataframe`; streaming callers can | ||
| avoid that materialization and handle each event as it arrives. | ||
|
|
||
| ## PDF pre-splitting for parallel ingest | ||
|
|
||
| Large PDFs are split into page batches before Ray processing so extraction can run in parallel. This happens on the default ingest path; you do not need extra configuration for typical workloads. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.run()incompatible with the stated use cases (notebooks, web services)The closing
asyncio.run(main())only works in a standalone script. Jupyter notebooks already have a running event loop, so this call raisesRuntimeError: This event loop is already running. FastAPI and other async web frameworks have the same restriction — callers there wouldawait main()directly or inline theasync forloop. The guidance text says "Use this form from async web services, task runners, or notebooks", which is exactly whereasyncio.run()cannot be called.Prompt To Fix With AI