Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions docling_jobkit/convert/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,31 @@ class DoclingConverterManagerConfig(BaseModel):
],
)

# Pipeline Pre-loading
preload_formats: list[str] = Field(
default_factory=list,
description=(
"List of InputFormat names (e.g. 'pdf', 'audio') whose ML "
"pipelines should be eagerly initialized at startup. "
"Pre-loading keeps models resident in memory, eliminating "
"cold-start latency on the first request for each configured "
"format. Configured formats are required: if a listed format "
"cannot be initialized, startup will fail."
),
examples=[["pdf", "audio"]],
)
preload_default_options: Optional[dict[str, Any]] = Field(
default=None,
description=(
"JSON-serialized ConvertDocumentsOptions whose PDF pipeline "
"hash must match real requests. When set, warm_up_caches() "
"and preload_additional_formats() use these options to select "
"the cached converter, ensuring the preloaded pipelines are "
"on the same converter that serves traffic. If None, bare "
"ConvertDocumentsOptions() defaults are used."
),
)


# Custom serializer for PdfFormatOption
# (model_dump_json does not work with some classes)
Expand Down Expand Up @@ -1179,3 +1204,78 @@ def convert_documents(
)

return results

def preload_additional_formats(self) -> None:
"""Eagerly initialize pipelines for formats in *config.preload_formats*.

This is a no-op when *preload_formats* is empty.

Raises
------
RuntimeError
If a configured format name is not a valid ``InputFormat`` or
has no format options in the converter.
Exception
If ``initialize_pipeline`` fails for a configured format.
Configured formats are treated as required: if the operator
listed a format in *preload_formats*, failure to initialize it
is a startup error, not a soft degradation.
"""
if not self.config.preload_formats:
return

if self.config.preload_default_options is not None:
options = ConvertDocumentsOptions.model_validate(
self.config.preload_default_options
)
else:
options = ConvertDocumentsOptions()
pdf_opts = self.get_pdf_pipeline_opts(options)
converter = self.get_converter(pdf_opts)

for fmt_name in self.config.preload_formats:
fmt_lower = fmt_name.lower()

try:
fmt = InputFormat(fmt_lower)
except ValueError:
raise RuntimeError(
f"Unknown format '{fmt_name}' in preload_formats. "
f"Valid formats: {[f.value for f in InputFormat]}"
)

if fmt not in converter.format_to_options:
raise RuntimeError(
f"Format '{fmt_name}' has no format options in "
f"converter and cannot be pre-loaded."
)

_log.info("Pre-warming %s pipeline...", fmt_lower)
converter.initialize_pipeline(fmt)
_log.info("Pipeline for %s is ready.", fmt_lower)

def validate_preload_formats(self) -> None:
"""Check that all *config.preload_formats* entries are valid.

Unlike :meth:`preload_additional_formats`, this does **not** create
a converter or load any ML models. Use this for lightweight config
validation when the actual loading will happen elsewhere (e.g. in
worker processes).

Raises
------
RuntimeError
If a configured format name is not a valid ``InputFormat``.
"""
if not self.config.preload_formats:
return

for fmt_name in self.config.preload_formats:
fmt_lower = fmt_name.lower()
try:
InputFormat(fmt_lower)
except ValueError:
raise RuntimeError(
f"Unknown format '{fmt_name}' in preload_formats. "
f"Valid formats: {[f.value for f in InputFormat]}"
)
20 changes: 18 additions & 2 deletions docling_jobkit/orchestrators/local/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,26 @@ async def process_queue(self):
_log.debug("All workers completed.")

async def warm_up_caches(self):
# Converter with default options
pdf_format_option = self.cm.get_pdf_pipeline_opts(ConvertDocumentsOptions())
# Resolve default options from config so the warmed converter's
# cache key matches the one that real requests will hit.
if self.cm.config.preload_default_options is not None:
options = ConvertDocumentsOptions.model_validate(
self.cm.config.preload_default_options
)
else:
options = ConvertDocumentsOptions()
pdf_format_option = self.cm.get_pdf_pipeline_opts(options)
converter = self.cm.get_converter(pdf_format_option)
converter.initialize_pipeline(InputFormat.PDF)
if self.config.shared_models:
# Shared mode: preload on orchestrator CM (workers reuse it)
self.cm.preload_additional_formats()
else:
# Non-shared mode: validate config only. Workers load their
# own models. Loading here would waste GPU memory on an extra
# copy that never serves requests (N+1 problem), potentially
# forcing a worker's model onto CPU.
self.cm.validate_preload_formats()

async def delete_task(self, task_id: str):
_log.info(f"Deleting result of task {task_id=}")
Expand Down
31 changes: 30 additions & 1 deletion docling_jobkit/orchestrators/local/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union
Expand All @@ -20,6 +21,19 @@
_log = logging.getLogger(__name__)


def _apply_torch_num_threads() -> None:
target = os.environ.get("OMP_NUM_THREADS")
if target is None:
return

try:
import torch

torch.set_num_threads(int(target))
except Exception:
pass


class AsyncLocalWorker:
def __init__(
self,
Expand All @@ -40,6 +54,18 @@ async def loop(self):
else:
cm = DoclingConverterManager(self.orchestrator.cm.config)
self.orchestrator.worker_cms.append(cm)

def preload_formats() -> None:
# PyTorch thread settings can drift on reused executor threads.
# Re-apply the configured thread budget before heavy model warmup.
_apply_torch_num_threads()
cm.preload_additional_formats()

# Pre-warm additional pipelines on this worker's own manager.
# Shared managers are pre-warmed centrally in warm_up_caches().
# Run off the event loop to avoid blocking health checks and
# other workers during heavy model loading.
await asyncio.to_thread(preload_formats)
while True:
task_id: str = await self.orchestrator.task_queue.get()
self.orchestrator.queue_list.remove(task_id)
Expand All @@ -51,7 +77,6 @@ async def loop(self):

try:
task.set_status(TaskStatus.STARTED)
_log.info(f"Worker {self.worker_id} processing task {task_id}")

if self.orchestrator.notifier:
# Notify clients about task updates
Expand All @@ -71,6 +96,10 @@ async def loop(self):

# Define a callback function to send progress updates to the client.
def run_task() -> DoclingTaskResult:
# Re-apply the configured thread budget before each
# conversion. ASR inference can otherwise fall back to the
# host CPU-count default on long-lived worker threads.
_apply_torch_num_threads()
convert_sources: list[Union[str, DocumentStream]] = []
headers: Optional[dict[str, Any]] = None
for source in task.sources:
Expand Down
1 change: 1 addition & 0 deletions docling_jobkit/orchestrators/rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
):
self.orchestrator_config = orchestrator_config
self.conversion_manager = DoclingConverterManager(cm_config)
self.conversion_manager.preload_additional_formats()
self.scratch_dir = scratch_dir

if "default_result_ttl" not in kwargs:
Expand Down
152 changes: 152 additions & 0 deletions tests/test_local_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ async def _wait_task_complete(
return False


async def _wait_until(predicate, max_wait: float = 10.0, interval: float = 0.1) -> bool:
start_time = time.monotonic()
while True:
if predicate():
return True
if time.monotonic() - start_time > max_wait:
return False
await asyncio.sleep(interval)


@pytest.mark.asyncio
async def test_convert_warmup():
cm_config = DoclingConverterManagerConfig()
Expand All @@ -172,6 +182,148 @@ async def test_convert_warmup():
assert len(converter.initialized_pipelines) > 0


@pytest.mark.asyncio
async def test_preload_shared_mode_calls_preload_on_orchestrator_cm():
"""With shared_models=True, warm_up_caches calls preload_additional_formats on self.cm."""
from unittest.mock import patch

cm_config = DoclingConverterManagerConfig(preload_formats=["pdf"])
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(shared_models=True)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

with patch.object(
cm, "preload_additional_formats", wraps=cm.preload_additional_formats
) as mock_preload:
await orchestrator.warm_up_caches()
mock_preload.assert_called_once()


@pytest.mark.asyncio
async def test_preload_non_shared_mode_validates_without_loading():
"""With shared_models=False, warm_up_caches validates format names only.

The orchestrator CM should NOT load extra models (to avoid the N+1
GPU memory problem). Workers load their own models. Bad format
names should still fail startup.
"""
from unittest.mock import patch

cm_config = DoclingConverterManagerConfig(preload_formats=["pdf"])
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

with (
patch.object(cm, "preload_additional_formats") as mock_preload,
patch.object(
cm, "validate_preload_formats", wraps=cm.validate_preload_formats
) as mock_validate,
):
await orchestrator.warm_up_caches()
mock_preload.assert_not_called()
mock_validate.assert_called_once()


@pytest.mark.asyncio
async def test_non_shared_workers_preload_own_managers():
"""Non-shared workers call preload_additional_formats on their own CMs."""
cm_config = DoclingConverterManagerConfig(preload_formats=["pdf"])
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(num_workers=1, shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

queue_task = asyncio.create_task(orchestrator.process_queue())

assert await _wait_until(lambda: len(orchestrator.worker_cms) == 1)

# Worker should have created its own CM
assert len(orchestrator.worker_cms) == 1
worker_cm = orchestrator.worker_cms[0]

# The worker CM should have pre-warmed its converter
assert await _wait_until(
lambda: len(
worker_cm.get_converter(
worker_cm.get_pdf_pipeline_opts(ConvertDocumentsOptions())
).initialized_pipelines
)
> 0
)

queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
async def test_preload_empty_is_noop():
"""Empty preload_formats means preload_additional_formats is not called."""
from unittest.mock import patch

cm_config = DoclingConverterManagerConfig(preload_formats=[])
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(shared_models=True)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

with patch.object(cm, "preload_additional_formats") as mock_preload:
await orchestrator.warm_up_caches()
# preload_additional_formats is called but is a no-op internally
# (returns early when preload_formats is empty)
mock_preload.assert_called_once()

# PDF should still be initialized by the explicit init in warm_up_caches
options = ConvertDocumentsOptions()
converter = cm.get_converter(cm.get_pdf_pipeline_opts(options))
assert len(converter.initialized_pipelines) > 0


def test_preload_additional_formats_unknown_format_raises():
"""Unknown format names in preload_formats raise RuntimeError at startup."""
cm_config = DoclingConverterManagerConfig(preload_formats=["nonexistent_format"])
cm = DoclingConverterManager(config=cm_config)

with pytest.raises(RuntimeError, match="Unknown format"):
cm.preload_additional_formats()


def test_validate_preload_formats_unknown_format_raises():
"""validate_preload_formats also catches unknown format names."""
cm_config = DoclingConverterManagerConfig(preload_formats=["nonexistent_format"])
cm = DoclingConverterManager(config=cm_config)

with pytest.raises(RuntimeError, match="Unknown format"):
cm.validate_preload_formats()


def test_preload_additional_formats_idempotent():
"""Calling preload_additional_formats twice is harmless."""
cm_config = DoclingConverterManagerConfig(preload_formats=["pdf"])
cm = DoclingConverterManager(config=cm_config)

cm.preload_additional_formats()
count_after_first = len(
cm.get_converter(
cm.get_pdf_pipeline_opts(ConvertDocumentsOptions())
).initialized_pipelines
)

cm.preload_additional_formats()
count_after_second = len(
cm.get_converter(
cm.get_pdf_pipeline_opts(ConvertDocumentsOptions())
).initialized_pipelines
)

assert count_after_first == count_after_second


@dataclass
class TestOption:
options: ConvertDocumentsOptions
Expand Down
Loading
Loading