From fe954a090e0baccd26afb9fd9e1a06b4b2544d0d Mon Sep 17 00:00:00 2001 From: bryan davis Date: Thu, 18 Jun 2026 16:35:48 -0500 Subject: [PATCH 1/5] move cpu bound actions off of main loop --- .../haiku/rag/chunkers/docling_serve.py | 11 +++-- .../haiku/rag/converters/docling_serve.py | 6 ++- .../haiku/rag/converters/pdf_split.py | 5 ++- .../haiku/rag/ingester/sources/fs.py | 23 +++++++--- tests/ingester/test_fs_source.py | 29 +++++++++++++ tests/test_chunker.py | 38 +++++++++++++++++ tests/test_converters.py | 34 +++++++++++++++ tests/test_pdf_split.py | 42 +++++++++++++++++++ 8 files changed, 177 insertions(+), 11 deletions(-) diff --git a/haiku_rag_slim/haiku/rag/chunkers/docling_serve.py b/haiku_rag_slim/haiku/rag/chunkers/docling_serve.py index 3cf4dc035..c03801342 100644 --- a/haiku_rag_slim/haiku/rag/chunkers/docling_serve.py +++ b/haiku_rag_slim/haiku/rag/chunkers/docling_serve.py @@ -1,3 +1,4 @@ +import asyncio import re from io import BytesIO from typing import TYPE_CHECKING @@ -99,9 +100,13 @@ async def _call_chunk_api(self, document: "DoclingDocument") -> list[dict]: else: endpoint = "/v1/chunk/hybrid/file/async" - # Export document to JSON - doc_json = document.model_dump_json() - doc_bytes = doc_json.encode("utf-8") + # Export document to JSON off the event loop. model_dump_json over a + # document carrying inlined base64 page/picture images is CPU-heavy and + # proportional to document size; running it inline would block every + # other worker's coroutine for the duration of the serialization. + doc_bytes = await asyncio.to_thread( + lambda: document.model_dump_json().encode("utf-8") + ) # Prepare multipart request with DoclingDocument JSON files = {"files": ("document.json", BytesIO(doc_bytes), "application/json")} diff --git a/haiku_rag_slim/haiku/rag/converters/docling_serve.py b/haiku_rag_slim/haiku/rag/converters/docling_serve.py index b07366f45..0a0e26687 100644 --- a/haiku_rag_slim/haiku/rag/converters/docling_serve.py +++ b/haiku_rag_slim/haiku/rag/converters/docling_serve.py @@ -225,7 +225,11 @@ async def _make_request(self, files: dict, name: str) -> "DoclingDocument": data=data, name=name, ) - return self._parse_zip_to_docling(zip_bytes, name) + # Parse off the event loop: the zip decompress, per-image base64 + # re-encoding, and DoclingDocument.model_validate are all synchronous + # and CPU-heavy (full-resolution page rasters when generate_page_images + # is on), so running inline would stall every other worker's coroutine. + return await asyncio.to_thread(self._parse_zip_to_docling, zip_bytes, name) async def convert_file( self, path: Path, source_uri: str | None = None diff --git a/haiku_rag_slim/haiku/rag/converters/pdf_split.py b/haiku_rag_slim/haiku/rag/converters/pdf_split.py index 4057b60e4..02d6a04b4 100644 --- a/haiku_rag_slim/haiku/rag/converters/pdf_split.py +++ b/haiku_rag_slim/haiku/rag/converters/pdf_split.py @@ -136,4 +136,7 @@ def _next(it): # Off the event loop because the close path acquires the lock. await asyncio.to_thread(it.close) - return DoclingDocument.concatenate(converted) + # Merge off the event loop: concatenating slice documents that carry + # inlined base64 page/picture images is CPU-heavy and proportional to the + # total document size, so running it inline would block other coroutines. + return await asyncio.to_thread(DoclingDocument.concatenate, converted) diff --git a/haiku_rag_slim/haiku/rag/ingester/sources/fs.py b/haiku_rag_slim/haiku/rag/ingester/sources/fs.py index 1cf6a6c49..57b2b23d3 100644 --- a/haiku_rag_slim/haiku/rag/ingester/sources/fs.py +++ b/haiku_rag_slim/haiku/rag/ingester/sources/fs.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import mimetypes import os @@ -87,23 +88,33 @@ async def head(self, uri: str) -> str | None: return None return str(path.stat().st_mtime_ns) + def _read_body(self, path: Path, uri: str) -> tuple[bytes, str, str]: + """Size-check, read, and hash the file. Runs in a worker thread (see + ``fetch``) because the read and the md5 are both proportional to file + size and would otherwise block the event loop for the whole read.""" + check_file_size(path.stat().st_size, self._max_file_size, uri) + body = path.read_bytes() + content_hash = hashlib.md5(body, usedforsecurity=False).hexdigest() + # mtime_ns rather than st_mtime: nanosecond integer avoids float + # precision collisions on rapid edits. + revision = str(path.stat().st_mtime_ns) + return body, content_hash, revision + async def fetch(self, uri: str) -> FetchResult: path = self._resolve_within_root(uri) if path is None: raise UnsupportedSourceError(f"Path escapes FS root ({self.root}): {uri}") - check_file_size(path.stat().st_size, self._max_file_size, uri) - body = path.read_bytes() + body, content_hash, revision = await asyncio.to_thread( + self._read_body, path, uri + ) content_type, _ = mimetypes.guess_type(path.name) if content_type is None: content_type = "application/octet-stream" - # mtime_ns rather than st_mtime: nanosecond integer avoids float - # precision collisions on rapid edits. - revision = str(path.stat().st_mtime_ns) return FetchResult( uri=path.as_uri(), body=body, content_type=content_type, - content_hash=hashlib.md5(body, usedforsecurity=False).hexdigest(), + content_hash=content_hash, revision=revision, disk_path=path, ) diff --git a/tests/ingester/test_fs_source.py b/tests/ingester/test_fs_source.py index 10f578297..73c7cf349 100644 --- a/tests/ingester/test_fs_source.py +++ b/tests/ingester/test_fs_source.py @@ -300,3 +300,32 @@ async def test_fs_source_fetch_no_limit_when_max_size_is_none(fs_root: Path): src = FSSource(root=fs_root, max_file_size=None) result = await src.fetch((fs_root / "a.md").as_uri()) assert result.body == b"alpha" + + +@pytest.mark.asyncio +async def test_fs_source_fetch_reads_off_event_loop_thread(fs_root: Path): + """The file read and md5 are both proportional to file size and must run + off the event-loop thread, or a large file would freeze every other + worker's coroutine for the duration of the read. Capture the thread the + read+hash runs on and assert it is not the main thread.""" + import threading + + src = FSSource(root=fs_root) + target = fs_root / "a.md" + + called_from: list[threading.Thread] = [] + original = src._read_body + + def spy(path, uri): + called_from.append(threading.current_thread()) + return original(path, uri) + + src._read_body = spy # type: ignore[method-assign] + + result = await src.fetch(target.as_uri()) + assert result.body == b"alpha" + assert called_from, "_read_body was never called" + assert called_from[0] is not threading.main_thread(), ( + "FSSource._read_body ran on the event-loop thread; the read+hash must " + "be dispatched via asyncio.to_thread" + ) diff --git a/tests/test_chunker.py b/tests/test_chunker.py index 1ad0b7709..9b897fc28 100644 --- a/tests/test_chunker.py +++ b/tests/test_chunker.py @@ -609,6 +609,44 @@ async def test_chunk_metadata_extraction(self, mock_client_class, chunker): assert meta1.headings == ["Chapter 1", "Section 1.1"] assert meta1.page_numbers == [1, 2] + @pytest.mark.asyncio + @patch("haiku.rag.providers.docling_serve.httpx.AsyncClient") + async def test_chunk_serializes_document_off_event_loop_thread( + self, mock_client_class, chunker + ): + """model_dump_json over a document carrying inlined base64 page/picture + images is CPU-heavy and proportional to document size; it must run off + the event-loop thread or it stalls every other worker's coroutine. + + A minimal fake document records the thread its model_dump_json runs on; + the API response carries no doc_items so the document is touched only + for serialization.""" + import threading + + result_data = {"chunks": [{"text": "Chunk 1", "chunk_index": 0}]} + submit_resp, poll_resp, result_resp = create_async_workflow_mocks(result_data) + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=submit_resp) + mock_client.get = AsyncMock(side_effect=[poll_resp, result_resp]) + mock_client_class.return_value.__aenter__.return_value = mock_client + + called_from: list[threading.Thread] = [] + + class FakeDoc: + def model_dump_json(self): + called_from.append(threading.current_thread()) + return "{}" + + chunks = await chunker.chunk(FakeDoc()) + + assert len(chunks) == 1 + assert called_from, "model_dump_json was never called" + assert called_from[0] is not threading.main_thread(), ( + "DoclingDocument.model_dump_json ran on the event-loop thread; it " + "must be dispatched via asyncio.to_thread" + ) + @pytest.mark.vcr() @pytest.mark.asyncio diff --git a/tests/test_converters.py b/tests/test_converters.py index 8081edd3c..916307e0e 100644 --- a/tests/test_converters.py +++ b/tests/test_converters.py @@ -94,6 +94,40 @@ def create_async_workflow_zip_mocks( return submit_response, poll_response, result_response +@pytest.mark.asyncio +async def test_parse_zip_runs_off_event_loop_thread(): + """_parse_zip_to_docling does zip decompress, per-image base64 re-encoding, + and DoclingDocument.model_validate — all synchronous and CPU-heavy (full- + resolution page rasters when generate_page_images is on). It must run off + the event-loop thread, or it stalls every other worker's coroutine. Capture + the thread it runs on and assert it is not the main thread.""" + import threading + + config = AppConfig() + config.processing.converter = "docling-serve" + converter = get_converter(config) + assert isinstance(converter, DoclingServeConverter) + + converter.client.submit_and_poll_zip = AsyncMock(return_value=b"zip-bytes") + + called_from: list[threading.Thread] = [] + + def spy(zip_bytes, name): + called_from.append(threading.current_thread()) + return Mock() + + converter._parse_zip_to_docling = spy # type: ignore[method-assign] + + files = {"files": ("doc.pdf", b"pdf", "application/octet-stream")} + await converter._make_request(files, "doc.pdf") + + assert called_from, "_parse_zip_to_docling was never called" + assert called_from[0] is not threading.main_thread(), ( + "_parse_zip_to_docling ran on the event-loop thread; it must be " + "dispatched via asyncio.to_thread" + ) + + class TestTextFileHandler: """Tests for TextFileHandler utility class.""" diff --git a/tests/test_pdf_split.py b/tests/test_pdf_split.py index accb1fd95..7c253f9ae 100644 --- a/tests/test_pdf_split.py +++ b/tests/test_pdf_split.py @@ -157,6 +157,48 @@ async def convert_file(self, path: Path, *, source_uri): assert len(calls) == 2 +@pytest.mark.asyncio +async def test_concatenate_runs_off_event_loop_thread(tmp_path, monkeypatch): + """DoclingDocument.concatenate merges slice documents that carry inlined + base64 page/picture images — CPU-heavy and proportional to total document + size. It must run off the event-loop thread so it doesn't stall other + workers' coroutines. Capture the thread it runs on and assert it is not the + main thread.""" + import threading + + from docling_core.types.doc.document import DoclingDocument + + src = _make_pdf(4, tmp_path) + + class _Converter: + async def convert_file(self, path: Path, *, source_uri): + return DoclingDocument(name="slice") + + called_from: list[threading.Thread] = [] + + def spy(docs): + called_from.append(threading.current_thread()) + # Return a slice doc rather than exercising the real concatenate — + # this test only asserts the dispatch thread, not merge correctness + # (covered by test_concatenate_shifts_page_nos_and_unique_self_refs). + return docs[0] + + monkeypatch.setattr(DoclingDocument, "concatenate", staticmethod(spy)) + + await convert_pdf_with_splitting( + _Converter(), # ty: ignore[invalid-argument-type] + src, + source_uri=None, + slice_size=2, + ) + + assert called_from, "concatenate was never called" + assert called_from[0] is not threading.main_thread(), ( + "DoclingDocument.concatenate ran on the event-loop thread; it must be " + "dispatched via asyncio.to_thread" + ) + + def test_concatenate_shifts_page_nos_and_unique_self_refs(): """Pins the docling-core contract we rely on: when two docs (each with items on page 1) are concatenated, the second doc's items move to page 2 From c6514c9df4bdf8b7c4b446727c0c9fd7685543bc Mon Sep 17 00:00:00 2001 From: Yiorgis Gozadinos Date: Mon, 22 Jun 2026 10:17:02 +0300 Subject: [PATCH 2/5] compare off-loop work against actual event-loop thread, fix ty --- tests/ingester/test_fs_source.py | 7 ++++--- tests/test_chunker.py | 3 ++- tests/test_converters.py | 9 ++++++--- tests/test_pdf_attachments.py | 5 +++-- tests/test_pdf_split.py | 3 ++- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/tests/ingester/test_fs_source.py b/tests/ingester/test_fs_source.py index 73c7cf349..31c7b8386 100644 --- a/tests/ingester/test_fs_source.py +++ b/tests/ingester/test_fs_source.py @@ -307,12 +307,13 @@ async def test_fs_source_fetch_reads_off_event_loop_thread(fs_root: Path): """The file read and md5 are both proportional to file size and must run off the event-loop thread, or a large file would freeze every other worker's coroutine for the duration of the read. Capture the thread the - read+hash runs on and assert it is not the main thread.""" + read+hash runs on and assert it is not the event-loop thread.""" import threading src = FSSource(root=fs_root) target = fs_root / "a.md" + event_loop_thread = threading.current_thread() called_from: list[threading.Thread] = [] original = src._read_body @@ -320,12 +321,12 @@ def spy(path, uri): called_from.append(threading.current_thread()) return original(path, uri) - src._read_body = spy # type: ignore[method-assign] + src._read_body = spy # type: ignore[method-assign] # ty: ignore[invalid-assignment] result = await src.fetch(target.as_uri()) assert result.body == b"alpha" assert called_from, "_read_body was never called" - assert called_from[0] is not threading.main_thread(), ( + assert called_from[0] is not event_loop_thread, ( "FSSource._read_body ran on the event-loop thread; the read+hash must " "be dispatched via asyncio.to_thread" ) diff --git a/tests/test_chunker.py b/tests/test_chunker.py index 9b897fc28..e59681a04 100644 --- a/tests/test_chunker.py +++ b/tests/test_chunker.py @@ -631,6 +631,7 @@ async def test_chunk_serializes_document_off_event_loop_thread( mock_client.get = AsyncMock(side_effect=[poll_resp, result_resp]) mock_client_class.return_value.__aenter__.return_value = mock_client + event_loop_thread = threading.current_thread() called_from: list[threading.Thread] = [] class FakeDoc: @@ -642,7 +643,7 @@ def model_dump_json(self): assert len(chunks) == 1 assert called_from, "model_dump_json was never called" - assert called_from[0] is not threading.main_thread(), ( + assert called_from[0] is not event_loop_thread, ( "DoclingDocument.model_dump_json ran on the event-loop thread; it " "must be dispatched via asyncio.to_thread" ) diff --git a/tests/test_converters.py b/tests/test_converters.py index 916307e0e..7447586d4 100644 --- a/tests/test_converters.py +++ b/tests/test_converters.py @@ -108,21 +108,24 @@ async def test_parse_zip_runs_off_event_loop_thread(): converter = get_converter(config) assert isinstance(converter, DoclingServeConverter) - converter.client.submit_and_poll_zip = AsyncMock(return_value=b"zip-bytes") + converter.client.submit_and_poll_zip = AsyncMock( # ty: ignore[invalid-assignment] + return_value=b"zip-bytes" + ) + event_loop_thread = threading.current_thread() called_from: list[threading.Thread] = [] def spy(zip_bytes, name): called_from.append(threading.current_thread()) return Mock() - converter._parse_zip_to_docling = spy # type: ignore[method-assign] + converter._parse_zip_to_docling = spy # type: ignore[method-assign] # ty: ignore[invalid-assignment] files = {"files": ("doc.pdf", b"pdf", "application/octet-stream")} await converter._make_request(files, "doc.pdf") assert called_from, "_parse_zip_to_docling was never called" - assert called_from[0] is not threading.main_thread(), ( + assert called_from[0] is not event_loop_thread, ( "_parse_zip_to_docling ran on the event-loop thread; it must be " "dispatched via asyncio.to_thread" ) diff --git a/tests/test_pdf_attachments.py b/tests/test_pdf_attachments.py index 702f855b4..69851570a 100644 --- a/tests/test_pdf_attachments.py +++ b/tests/test_pdf_attachments.py @@ -494,7 +494,8 @@ async def test_extract_pdf_attachments_called_off_event_loop_thread( duration of pdfium I/O, stalling every other concurrent worker. We verify this by capturing the thread identity inside a spy wrapper: if - asyncio.to_thread is used correctly the spy runs on a non-main thread.""" + asyncio.to_thread is used correctly the spy runs off the event-loop thread.""" + event_loop_thread = threading.current_thread() called_from: list[threading.Thread] = [] def spy(body, uri, *, depth): @@ -514,7 +515,7 @@ def spy(body, uri, *, depth): await _reconcile_pdf_attachments(client, parent, pdf_bytes, depth=0) assert called_from, "_extract_pdf_attachments was never called" - assert called_from[0] is not threading.main_thread(), ( + assert called_from[0] is not event_loop_thread, ( "_extract_pdf_attachments ran on the event-loop thread; " "it must be dispatched via asyncio.to_thread to avoid blocking the loop" ) diff --git a/tests/test_pdf_split.py b/tests/test_pdf_split.py index 7c253f9ae..9ea00a8cc 100644 --- a/tests/test_pdf_split.py +++ b/tests/test_pdf_split.py @@ -174,6 +174,7 @@ class _Converter: async def convert_file(self, path: Path, *, source_uri): return DoclingDocument(name="slice") + event_loop_thread = threading.current_thread() called_from: list[threading.Thread] = [] def spy(docs): @@ -193,7 +194,7 @@ def spy(docs): ) assert called_from, "concatenate was never called" - assert called_from[0] is not threading.main_thread(), ( + assert called_from[0] is not event_loop_thread, ( "DoclingDocument.concatenate ran on the event-loop thread; it must be " "dispatched via asyncio.to_thread" ) From 183d595494f3bcc01d4f963639568eb594a4ede4 Mon Sep 17 00:00:00 2001 From: Yiorgis Gozadinos Date: Mon, 22 Jun 2026 10:30:02 +0300 Subject: [PATCH 3/5] prepare stored Docling blobs off the event loop --- haiku_rag_slim/haiku/rag/client/documents.py | 87 ++++++++++++-------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/haiku_rag_slim/haiku/rag/client/documents.py b/haiku_rag_slim/haiku/rag/client/documents.py index cfb6b7087..76aa27574 100644 --- a/haiku_rag_slim/haiku/rag/client/documents.py +++ b/haiku_rag_slim/haiku/rag/client/documents.py @@ -60,6 +60,30 @@ class DocumentImport: _RESERVED_METADATA_KEYS = frozenset({"content_type", "md5", "source_revision"}) +def _prepare_document_from_docling_sync( + document: Document, docling_document: "DoclingDocument" +) -> str: + """Populate content/docling blobs from a DoclingDocument. + + This performs size-proportional serialization, JSON splitting, and + compression via ``Document.set_docling``. Async ingestion paths should call + it through ``_prepare_document_from_docling`` so large image-bearing + documents do not block the event loop. + """ + content = docling_document.export_to_markdown() + document.content = content + document.set_docling(docling_document) + return content + + +async def _prepare_document_from_docling( + document: Document, docling_document: "DoclingDocument" +) -> str: + return await asyncio.to_thread( + _prepare_document_from_docling_sync, document, docling_document + ) + + def parent_uri_filter(parent_uri: str) -> str: """SQL `WHERE` clause matching documents whose ``metadata.parent_uri`` equals ``parent_uri``. ``metadata`` is stored as a JSON string produced by @@ -184,18 +208,18 @@ async def create_document( chunks = await client.chunk(docling_document) embedded_chunks = await embed_chunks(chunks, client.embedder, client._config) - stored_content = docling_document.export_to_markdown() - - if title is None: - title = await resolve_title(client._config, docling_document, stored_content) - document = Document( - content=stored_content, + content="", uri=uri, title=title, metadata=metadata or {}, ) - document.set_docling(docling_document) + stored_content = await _prepare_document_from_docling(document, docling_document) + + if title is None: + document.title = await resolve_title( + client._config, docling_document, stored_content + ) return await _store_document_with_chunks( client, document, embedded_chunks, docling_document @@ -215,17 +239,15 @@ async def import_document( Use this when conversion, chunking, and embedding were done externally. Chunks without embeddings will be automatically embedded. """ - content = docling_document.export_to_markdown() - if title is None: - title = await resolve_title(client._config, docling_document, content) - document = Document( - content=content, + content="", uri=uri, title=title, metadata=metadata or {}, ) - document.set_docling(docling_document) + content = await _prepare_document_from_docling(document, docling_document) + if title is None: + document.title = await resolve_title(client._config, docling_document, content) return await _store_document_with_chunks(client, document, chunks, docling_document) @@ -291,18 +313,17 @@ async def import_documents( prepared: list[tuple[Document, list[Chunk], DoclingDocument]] = [] for item in imports: - content = item.docling_document.export_to_markdown() - title = item.title - if title is None: - title = await resolve_title(client._config, item.docling_document, content) - document = Document( - content=content, + content="", uri=item.uri, - title=title, + title=item.title, metadata=item.metadata or {}, ) - document.set_docling(item.docling_document) + content = await _prepare_document_from_docling(document, item.docling_document) + if document.title is None: + document.title = await resolve_title( + client._config, item.docling_document, content + ) prepared.append((document, item.chunks, item.docling_document)) return await _store_documents_with_chunks(client, prepared) @@ -427,13 +448,13 @@ async def _ingest_fetch_result( if cleanup_path is not None: cleanup_path.unlink(missing_ok=True) - stored_content = docling_document.export_to_markdown() final_metadata = {**user_metadata, **source_metadata} if existing_doc: - existing_doc.content = stored_content existing_doc.metadata = final_metadata - existing_doc.set_docling(docling_document) + stored_content = await _prepare_document_from_docling( + existing_doc, docling_document + ) if title is not None: existing_doc.title = title elif existing_doc.title is None: @@ -448,15 +469,17 @@ async def _ingest_fetch_result( await _reconcile_pdf_attachments(client, updated, result.body, depth=depth) return updated - if title is None: - title = await resolve_title(client._config, docling_document, stored_content) document = Document( - content=stored_content, + content="", uri=stored_uri, title=title, metadata=final_metadata, ) - document.set_docling(docling_document) + stored_content = await _prepare_document_from_docling(document, docling_document) + if document.title is None: + document.title = await resolve_title( + client._config, docling_document, stored_content + ) with logfire.span("document.store", uri=result.uri, op="create") as store_span: created = await _store_document_with_chunks( client, document, embedded_chunks, docling_document @@ -807,8 +830,7 @@ async def update_document( if chunks is not None: if docling_document is not None: - existing_doc.content = docling_document.export_to_markdown() - existing_doc.set_docling(docling_document) + await _prepare_document_from_docling(existing_doc, docling_document) elif content is not None: existing_doc.content = content @@ -817,8 +839,7 @@ async def update_document( ) if docling_document is not None: - existing_doc.content = docling_document.export_to_markdown() - existing_doc.set_docling(docling_document) + await _prepare_document_from_docling(existing_doc, docling_document) new_chunks = await client.chunk(docling_document) embedded_chunks = await embed_chunks( @@ -832,7 +853,7 @@ async def update_document( existing_doc.content = content converter = get_converter(client._config) converted_docling = await converter.convert_text(existing_doc.content, format="md") - existing_doc.set_docling(converted_docling) + await _prepare_document_from_docling(existing_doc, converted_docling) new_chunks = await client.chunk(converted_docling) embedded_chunks = await embed_chunks(new_chunks, client.embedder, client._config) From d3a1011baf032fca6379c6c6b10d5d9b55162c56 Mon Sep 17 00:00:00 2001 From: Yiorgis Gozadinos Date: Mon, 22 Jun 2026 10:37:48 +0300 Subject: [PATCH 4/5] write fetched bodies off the event loop --- haiku_rag_slim/haiku/rag/client/documents.py | 22 ++++--- tests/test_client.py | 66 +++++++++++++++++++- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/haiku_rag_slim/haiku/rag/client/documents.py b/haiku_rag_slim/haiku/rag/client/documents.py index 76aa27574..7e20181f3 100644 --- a/haiku_rag_slim/haiku/rag/client/documents.py +++ b/haiku_rag_slim/haiku/rag/client/documents.py @@ -84,6 +84,19 @@ async def _prepare_document_from_docling( ) +def _write_fetch_body_sync(body: bytes, suffix: str) -> Path: + with tempfile.NamedTemporaryFile( + mode="wb", suffix=suffix, delete=False + ) as temp_file: + temp_file.write(body) + temp_file.flush() + return Path(temp_file.name) + + +async def _write_fetch_body(body: bytes, suffix: str) -> Path: + return await asyncio.to_thread(_write_fetch_body_sync, body, suffix) + + def parent_uri_filter(parent_uri: str) -> str: """SQL `WHERE` clause matching documents whose ``metadata.parent_uri`` equals ``parent_uri``. ``metadata`` is stored as a JSON string produced by @@ -426,13 +439,8 @@ async def _ingest_fetch_result( target_path = result.disk_path cleanup_path: Path | None = None else: - with tempfile.NamedTemporaryFile( - mode="wb", suffix=file_extension, delete=False - ) as temp_file: - temp_file.write(result.body) - temp_file.flush() - target_path = Path(temp_file.name) - cleanup_path = target_path + target_path = await _write_fetch_body(result.body, file_extension) + cleanup_path = target_path try: with logfire.span("document.convert", uri=result.uri): diff --git a/tests/test_client.py b/tests/test_client.py index b2ebc8a2d..857a1bff0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,13 +1,20 @@ import json import tempfile +import threading from pathlib import Path from unittest.mock import AsyncMock, patch import httpx import pytest +from docling_core.types.doc.document import DoclingDocument +from docling_core.types.doc.labels import DocItemLabel from haiku.rag.client import HaikuRAG -from haiku.rag.client.documents import DocumentImport +from haiku.rag.client.documents import ( + DocumentImport, + _prepare_document_from_docling, + _write_fetch_body, +) from haiku.rag.config import Config from haiku.rag.store.compression import decompress_json from haiku.rag.store.models.chunk import Chunk @@ -19,6 +26,63 @@ def vcr_cassette_dir(): return str(Path(__file__).parent / "cassettes" / "test_client") +@pytest.mark.asyncio +async def test_prepare_document_from_docling_runs_off_event_loop_thread(monkeypatch): + import haiku.rag.client.documents as documents + + event_loop_thread = threading.current_thread() + called_from: list[threading.Thread] = [] + + docling_doc = DoclingDocument(name="thread-check") + docling_doc.add_text(label=DocItemLabel.TEXT, text="Threaded content") + document = Document(content="") + original = documents._prepare_document_from_docling_sync + + def spy(doc, docling): + called_from.append(threading.current_thread()) + return original(doc, docling) + + monkeypatch.setattr(documents, "_prepare_document_from_docling_sync", spy) + + content = await _prepare_document_from_docling(document, docling_doc) + + assert content == "Threaded content" + assert document.content == "Threaded content" + assert document.docling_document is not None + assert called_from, "Document.set_docling was never called" + assert called_from[0] is not event_loop_thread, ( + "Document.set_docling ran on the event-loop thread; document prep must " + "be dispatched via asyncio.to_thread" + ) + + +@pytest.mark.asyncio +async def test_write_fetch_body_runs_off_event_loop_thread(monkeypatch): + import haiku.rag.client.documents as documents + + event_loop_thread = threading.current_thread() + called_from: list[threading.Thread] = [] + original = documents._write_fetch_body_sync + + def spy(body, suffix): + called_from.append(threading.current_thread()) + return original(body, suffix) + + monkeypatch.setattr(documents, "_write_fetch_body_sync", spy) + + path = await _write_fetch_body(b"payload", ".bin") + try: + assert path.read_bytes() == b"payload" + finally: + path.unlink(missing_ok=True) + + assert called_from, "_write_fetch_body_sync was never called" + assert called_from[0] is not event_loop_thread, ( + "_write_fetch_body_sync ran on the event-loop thread; fetched body " + "writes must be dispatched via asyncio.to_thread" + ) + + @pytest.mark.vcr() async def test_client_document_crud(qa_corpus: list[dict[str, str]], temp_db_path): """Test HaikuRAG CRUD operations for documents.""" From ad3b111cf16d4572164464582651204af17ff334 Mon Sep 17 00:00:00 2001 From: Yiorgis Gozadinos Date: Mon, 22 Jun 2026 10:51:35 +0300 Subject: [PATCH 5/5] test: clarify off-loop thread assertions --- tests/test_converters.py | 2 +- tests/test_pdf_split.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_converters.py b/tests/test_converters.py index 7447586d4..f110605c3 100644 --- a/tests/test_converters.py +++ b/tests/test_converters.py @@ -100,7 +100,7 @@ async def test_parse_zip_runs_off_event_loop_thread(): and DoclingDocument.model_validate — all synchronous and CPU-heavy (full- resolution page rasters when generate_page_images is on). It must run off the event-loop thread, or it stalls every other worker's coroutine. Capture - the thread it runs on and assert it is not the main thread.""" + the thread it runs on and assert it is not the event-loop thread.""" import threading config = AppConfig() diff --git a/tests/test_pdf_split.py b/tests/test_pdf_split.py index 9ea00a8cc..09b3f2740 100644 --- a/tests/test_pdf_split.py +++ b/tests/test_pdf_split.py @@ -163,7 +163,7 @@ async def test_concatenate_runs_off_event_loop_thread(tmp_path, monkeypatch): base64 page/picture images — CPU-heavy and proportional to total document size. It must run off the event-loop thread so it doesn't stall other workers' coroutines. Capture the thread it runs on and assert it is not the - main thread.""" + event-loop thread.""" import threading from docling_core.types.doc.document import DoclingDocument