Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions haiku_rag_slim/haiku/rag/chunkers/docling_serve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
from io import BytesIO
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -99,9 +100,13 @@ async def _call_chunk_api(self, document: "DoclingDocument") -> list[dict]:
else:
endpoint = "/v1/chunk/hybrid/file/async"

# Export document to JSON
doc_json = document.model_dump_json()
doc_bytes = doc_json.encode("utf-8")
# Export document to JSON off the event loop. model_dump_json over a
# document carrying inlined base64 page/picture images is CPU-heavy and
# proportional to document size; running it inline would block every
# other worker's coroutine for the duration of the serialization.
doc_bytes = await asyncio.to_thread(
lambda: document.model_dump_json().encode("utf-8")
)

# Prepare multipart request with DoclingDocument JSON
files = {"files": ("document.json", BytesIO(doc_bytes), "application/json")}
Expand Down
109 changes: 69 additions & 40 deletions haiku_rag_slim/haiku/rag/client/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,43 @@ class DocumentImport:
_RESERVED_METADATA_KEYS = frozenset({"content_type", "md5", "source_revision"})


def _prepare_document_from_docling_sync(
document: Document, docling_document: "DoclingDocument"
) -> str:
"""Populate content/docling blobs from a DoclingDocument.

This performs size-proportional serialization, JSON splitting, and
compression via ``Document.set_docling``. Async ingestion paths should call
it through ``_prepare_document_from_docling`` so large image-bearing
documents do not block the event loop.
"""
content = docling_document.export_to_markdown()
document.content = content
document.set_docling(docling_document)
return content


async def _prepare_document_from_docling(
document: Document, docling_document: "DoclingDocument"
) -> str:
return await asyncio.to_thread(
_prepare_document_from_docling_sync, document, docling_document
)


def _write_fetch_body_sync(body: bytes, suffix: str) -> Path:
with tempfile.NamedTemporaryFile(
mode="wb", suffix=suffix, delete=False
) as temp_file:
temp_file.write(body)
temp_file.flush()
return Path(temp_file.name)


async def _write_fetch_body(body: bytes, suffix: str) -> Path:
return await asyncio.to_thread(_write_fetch_body_sync, body, suffix)


def parent_uri_filter(parent_uri: str) -> str:
"""SQL `WHERE` clause matching documents whose ``metadata.parent_uri``
equals ``parent_uri``. ``metadata`` is stored as a JSON string produced by
Expand Down Expand Up @@ -184,18 +221,18 @@ async def create_document(
chunks = await client.chunk(docling_document)
embedded_chunks = await embed_chunks(chunks, client.embedder, client._config)

stored_content = docling_document.export_to_markdown()

if title is None:
title = await resolve_title(client._config, docling_document, stored_content)

document = Document(
content=stored_content,
content="",
uri=uri,
title=title,
metadata=metadata or {},
)
document.set_docling(docling_document)
stored_content = await _prepare_document_from_docling(document, docling_document)

if title is None:
document.title = await resolve_title(
client._config, docling_document, stored_content
)

return await _store_document_with_chunks(
client, document, embedded_chunks, docling_document
Expand All @@ -215,17 +252,15 @@ async def import_document(
Use this when conversion, chunking, and embedding were done externally.
Chunks without embeddings will be automatically embedded.
"""
content = docling_document.export_to_markdown()
if title is None:
title = await resolve_title(client._config, docling_document, content)

document = Document(
content=content,
content="",
uri=uri,
title=title,
metadata=metadata or {},
)
document.set_docling(docling_document)
content = await _prepare_document_from_docling(document, docling_document)
if title is None:
document.title = await resolve_title(client._config, docling_document, content)

return await _store_document_with_chunks(client, document, chunks, docling_document)

Expand Down Expand Up @@ -291,18 +326,17 @@ async def import_documents(

prepared: list[tuple[Document, list[Chunk], DoclingDocument]] = []
for item in imports:
content = item.docling_document.export_to_markdown()
title = item.title
if title is None:
title = await resolve_title(client._config, item.docling_document, content)

document = Document(
content=content,
content="",
uri=item.uri,
title=title,
title=item.title,
metadata=item.metadata or {},
)
document.set_docling(item.docling_document)
content = await _prepare_document_from_docling(document, item.docling_document)
if document.title is None:
document.title = await resolve_title(
client._config, item.docling_document, content
)
prepared.append((document, item.chunks, item.docling_document))

return await _store_documents_with_chunks(client, prepared)
Expand Down Expand Up @@ -405,13 +439,8 @@ async def _ingest_fetch_result(
target_path = result.disk_path
cleanup_path: Path | None = None
else:
with tempfile.NamedTemporaryFile(
mode="wb", suffix=file_extension, delete=False
) as temp_file:
temp_file.write(result.body)
temp_file.flush()
target_path = Path(temp_file.name)
cleanup_path = target_path
target_path = await _write_fetch_body(result.body, file_extension)
cleanup_path = target_path

try:
with logfire.span("document.convert", uri=result.uri):
Expand All @@ -427,13 +456,13 @@ async def _ingest_fetch_result(
if cleanup_path is not None:
cleanup_path.unlink(missing_ok=True)

stored_content = docling_document.export_to_markdown()
final_metadata = {**user_metadata, **source_metadata}

if existing_doc:
existing_doc.content = stored_content
existing_doc.metadata = final_metadata
existing_doc.set_docling(docling_document)
stored_content = await _prepare_document_from_docling(
existing_doc, docling_document
)
if title is not None:
existing_doc.title = title
elif existing_doc.title is None:
Expand All @@ -448,15 +477,17 @@ async def _ingest_fetch_result(
await _reconcile_pdf_attachments(client, updated, result.body, depth=depth)
return updated

if title is None:
title = await resolve_title(client._config, docling_document, stored_content)
document = Document(
content=stored_content,
content="",
uri=stored_uri,
title=title,
metadata=final_metadata,
)
document.set_docling(docling_document)
stored_content = await _prepare_document_from_docling(document, docling_document)
if document.title is None:
document.title = await resolve_title(
client._config, docling_document, stored_content
)
with logfire.span("document.store", uri=result.uri, op="create") as store_span:
created = await _store_document_with_chunks(
client, document, embedded_chunks, docling_document
Expand Down Expand Up @@ -807,8 +838,7 @@ async def update_document(

if chunks is not None:
if docling_document is not None:
existing_doc.content = docling_document.export_to_markdown()
existing_doc.set_docling(docling_document)
await _prepare_document_from_docling(existing_doc, docling_document)
elif content is not None:
existing_doc.content = content

Expand All @@ -817,8 +847,7 @@ async def update_document(
)

if docling_document is not None:
existing_doc.content = docling_document.export_to_markdown()
existing_doc.set_docling(docling_document)
await _prepare_document_from_docling(existing_doc, docling_document)

new_chunks = await client.chunk(docling_document)
embedded_chunks = await embed_chunks(
Expand All @@ -832,7 +861,7 @@ async def update_document(
existing_doc.content = content
converter = get_converter(client._config)
converted_docling = await converter.convert_text(existing_doc.content, format="md")
existing_doc.set_docling(converted_docling)
await _prepare_document_from_docling(existing_doc, converted_docling)

new_chunks = await client.chunk(converted_docling)
embedded_chunks = await embed_chunks(new_chunks, client.embedder, client._config)
Expand Down
6 changes: 5 additions & 1 deletion haiku_rag_slim/haiku/rag/converters/docling_serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ async def _make_request(self, files: dict, name: str) -> "DoclingDocument":
data=data,
name=name,
)
return self._parse_zip_to_docling(zip_bytes, name)
# Parse off the event loop: the zip decompress, per-image base64
# re-encoding, and DoclingDocument.model_validate are all synchronous
# and CPU-heavy (full-resolution page rasters when generate_page_images
# is on), so running inline would stall every other worker's coroutine.
return await asyncio.to_thread(self._parse_zip_to_docling, zip_bytes, name)

async def convert_file(
self, path: Path, source_uri: str | None = None
Expand Down
5 changes: 4 additions & 1 deletion haiku_rag_slim/haiku/rag/converters/pdf_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ def _next(it):
# Off the event loop because the close path acquires the lock.
await asyncio.to_thread(it.close)

return DoclingDocument.concatenate(converted)
# Merge off the event loop: concatenating slice documents that carry
# inlined base64 page/picture images is CPU-heavy and proportional to the
# total document size, so running it inline would block other coroutines.
return await asyncio.to_thread(DoclingDocument.concatenate, converted)
23 changes: 17 additions & 6 deletions haiku_rag_slim/haiku/rag/ingester/sources/fs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import mimetypes
import os
Expand Down Expand Up @@ -87,23 +88,33 @@ async def head(self, uri: str) -> str | None:
return None
return str(path.stat().st_mtime_ns)

def _read_body(self, path: Path, uri: str) -> tuple[bytes, str, str]:
"""Size-check, read, and hash the file. Runs in a worker thread (see
``fetch``) because the read and the md5 are both proportional to file
size and would otherwise block the event loop for the whole read."""
check_file_size(path.stat().st_size, self._max_file_size, uri)
body = path.read_bytes()
content_hash = hashlib.md5(body, usedforsecurity=False).hexdigest()
# mtime_ns rather than st_mtime: nanosecond integer avoids float
# precision collisions on rapid edits.
revision = str(path.stat().st_mtime_ns)
return body, content_hash, revision

async def fetch(self, uri: str) -> FetchResult:
path = self._resolve_within_root(uri)
if path is None:
raise UnsupportedSourceError(f"Path escapes FS root ({self.root}): {uri}")
check_file_size(path.stat().st_size, self._max_file_size, uri)
body = path.read_bytes()
body, content_hash, revision = await asyncio.to_thread(
self._read_body, path, uri
)
content_type, _ = mimetypes.guess_type(path.name)
if content_type is None:
content_type = "application/octet-stream"
# mtime_ns rather than st_mtime: nanosecond integer avoids float
# precision collisions on rapid edits.
revision = str(path.stat().st_mtime_ns)
return FetchResult(
uri=path.as_uri(),
body=body,
content_type=content_type,
content_hash=hashlib.md5(body, usedforsecurity=False).hexdigest(),
content_hash=content_hash,
revision=revision,
disk_path=path,
)
Expand Down
30 changes: 30 additions & 0 deletions tests/ingester/test_fs_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,33 @@ async def test_fs_source_fetch_no_limit_when_max_size_is_none(fs_root: Path):
src = FSSource(root=fs_root, max_file_size=None)
result = await src.fetch((fs_root / "a.md").as_uri())
assert result.body == b"alpha"


@pytest.mark.asyncio
async def test_fs_source_fetch_reads_off_event_loop_thread(fs_root: Path):
"""The file read and md5 are both proportional to file size and must run
off the event-loop thread, or a large file would freeze every other
worker's coroutine for the duration of the read. Capture the thread the
read+hash runs on and assert it is not the event-loop thread."""
import threading

src = FSSource(root=fs_root)
target = fs_root / "a.md"

event_loop_thread = threading.current_thread()
called_from: list[threading.Thread] = []
original = src._read_body

def spy(path, uri):
called_from.append(threading.current_thread())
return original(path, uri)

src._read_body = spy # type: ignore[method-assign] # ty: ignore[invalid-assignment]

result = await src.fetch(target.as_uri())
assert result.body == b"alpha"
assert called_from, "_read_body was never called"
assert called_from[0] is not event_loop_thread, (
"FSSource._read_body ran on the event-loop thread; the read+hash must "
"be dispatched via asyncio.to_thread"
)
39 changes: 39 additions & 0 deletions tests/test_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,45 @@ async def test_chunk_metadata_extraction(self, mock_client_class, chunker):
assert meta1.headings == ["Chapter 1", "Section 1.1"]
assert meta1.page_numbers == [1, 2]

@pytest.mark.asyncio
@patch("haiku.rag.providers.docling_serve.httpx.AsyncClient")
async def test_chunk_serializes_document_off_event_loop_thread(
self, mock_client_class, chunker
):
"""model_dump_json over a document carrying inlined base64 page/picture
images is CPU-heavy and proportional to document size; it must run off
the event-loop thread or it stalls every other worker's coroutine.

A minimal fake document records the thread its model_dump_json runs on;
the API response carries no doc_items so the document is touched only
for serialization."""
import threading

result_data = {"chunks": [{"text": "Chunk 1", "chunk_index": 0}]}
submit_resp, poll_resp, result_resp = create_async_workflow_mocks(result_data)

mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=submit_resp)
mock_client.get = AsyncMock(side_effect=[poll_resp, result_resp])
mock_client_class.return_value.__aenter__.return_value = mock_client

event_loop_thread = threading.current_thread()
called_from: list[threading.Thread] = []

class FakeDoc:
def model_dump_json(self):
called_from.append(threading.current_thread())
return "{}"

chunks = await chunker.chunk(FakeDoc())

assert len(chunks) == 1
assert called_from, "model_dump_json was never called"
assert called_from[0] is not event_loop_thread, (
"DoclingDocument.model_dump_json ran on the event-loop thread; it "
"must be dispatched via asyncio.to_thread"
)


@pytest.mark.vcr()
@pytest.mark.asyncio
Expand Down
Loading
Loading