Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .hydra_config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ llm:
vlm:
<<: *llm_params
base_url: ${oc.env:VLM_BASE_URL}
model: ${oc.env:VLM_MODEL}
model: ${oc.env:VLM_MODEL}
api_key: ${oc.env:VLM_API_KEY}

semaphore:
Expand All @@ -31,12 +31,12 @@ embedder:
model_name: ${oc.env:EMBEDDER_MODEL_NAME, jinaai/jina-embeddings-v3}
base_url: ${oc.env:EMBEDDER_BASE_URL, http://vllm:8000/v1}
api_key: ${oc.env:EMBEDDER_API_KEY, EMPTY}

vectordb:
host: ${oc.env:VDB_HOST, milvus}
port: ${oc.env:VDB_iPORT, 19530}
connector_name: ${oc.env:VDB_CONNECTOR_NAME, milvus}
collection_name: ${oc.env:VDB_COLLECTION_NAME, vdb_test}
collection_name: ${oc.env:VDB_COLLECTION_NAME, vdb_test}
hybrid_search: true
enable: true

Expand Down Expand Up @@ -102,7 +102,7 @@ loader:
audio/mpeg: .mp3
file_loaders:
txt: TextLoader
pdf: ${oc.env:PDFLoader, MarkerLoader} # DoclingLoader # MarkerLoader # PyMuPDFLoader # Custompymupdf4llm
pdf: ${oc.env:PDFLoader, MarkerLoader} # DoclingLoader # MarkerLoader # PyMuPDFLoader # Custompymupdf4llm
eml: EmlLoader
docx: DocxLoader
pptx: PPTXLoader
Expand All @@ -125,6 +125,15 @@ loader:
marker_min_processes: ${oc.decode:${oc.env:MARKER_MIN_PROCESSES, 1}}
marker_num_gpus: ${oc.decode:${oc.env:MARKER_NUM_GPUS, 0.01}}
marker_timeout: ${oc.decode:${oc.env:MARKER_TIMEOUT, 3600}}
openai:
base_url: ${oc.env:OPENAI_LOADER_BASE_URL, http://openai:8000/v1}
api_key: ${oc.env:OPENAI_LOADER_API_KEY, EMPTY}
model: ${oc.env:OPENAI_LOADER_MODEL, dotsocr-model}
temperature: ${oc.decode:${oc.env:OPENAI_LOADER_TEMPERATURE, 0.2}}
timeout: ${oc.decode:${oc.env:OPENAI_LOADER_TIMEOUT, 180}}
max_retries: ${oc.decode:${oc.env:OPENAI_LOADER_MAX_RETRIES, 2}}
top_p: ${oc.decode:${oc.env:OPENAI_LOADER_TOP_P, 0.9}}
concurrency_limit: ${oc.decode:${oc.env:OPENAI_LOADER_CONCURRENCY_LIMIT, 20}}

ray:
num_gpus: ${oc.decode:${oc.env:RAY_NUM_GPUS, 0.01}}
Expand Down
62 changes: 62 additions & 0 deletions openrag/components/indexer/loaders/pdf_loaders/dotsocr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from PIL import Image
from tqdm.asyncio import tqdm
from utils.logger import logger # assuming you have a shared logger instance

from .openai import OpenAILoader


class DotsOCRLoader(OpenAILoader):
"""PDF loader using DotsOCR"""

PROMPT = """Please output the layout information from the PDF image, including each layout element's bbox, its category, and the corresponding text content within the bbox.

1. Bbox format: [x1, y1, x2, y2]

2. Layout Categories: The possible categories are ['Caption', 'Footnote', 'Formula', 'List-item', 'Page-footer', 'Page-header', 'Picture', 'Section-header', 'Table', 'Text', 'Title'].

3. Text Extraction & Formatting Rules:
- Picture: For the 'Picture' category, the text field should be omitted.
- Formula: Format its text as LaTeX.
- Table: Format its text as HTML.
- All Others (Text, Title, etc.): Format their text as Markdown.

4. Constraints:
- The output text must be the original text from the image, with no translation.
- All layout elements must be sorted according to human reading order.

5. Final Output: The entire output must be a single JSON object.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

async def _caption_images(self, page_img: Image.Image, page_res: list):
"""Extract picture elements and caption them."""
picture_items = [item for item in page_res if item.get("category") == "Picture"]
if not picture_items:
return

picture_crops = []
for item in picture_items:
bbox = item.get("bbox")
if bbox and len(bbox) == 4:
try:
cropped = page_img.crop(bbox)
picture_crops.append((item, cropped))
except Exception as e:
logger.warning(f"Failed to crop image bbox {bbox}: {e}")

if picture_crops:
desc_tasks = [self._get_caption(crop) for _, crop in picture_crops]
desc_results = await tqdm.gather(
*desc_tasks,
desc="Captioning images",
total=len(desc_tasks),
)
for (item, _), desc in zip(picture_crops, desc_results):
item["text"] = desc.strip() if isinstance(desc, str) else ""

def _result_to_md(self, result: list[dict]) -> str:
return "\n".join(
item.get("text", "").strip() for item in result if item.get("text")
)
138 changes: 138 additions & 0 deletions openrag/components/indexer/loaders/pdf_loaders/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import asyncio
import base64
import io
import json
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union

import pypdfium2 as pdfium
from langchain.schema import Document
from langchain_openai import ChatOpenAI
from PIL import Image
from utils.logger import logger

from ..base import BaseLoader


class OpenAILoader(BaseLoader, ABC):
"""Generic OpenAI-compatible loader for multimodal OCR-style models."""

PROMPT: str = "" # To be defined by subclasses

def __init__(self, **kwargs):
super().__init__(**kwargs)

self.llm = ChatOpenAI(
base_url=self.config.loader["openai"]["base_url"],
api_key=self.config.loader["openai"]["api_key"],
model=self.config.loader["openai"]["model"],
temperature=self.config.loader["openai"].get("temperature", 0.2),
timeout=self.config.loader["openai"].get("timeout", 180),
max_retries=self.config.loader["openai"].get("max_retries", 2),
top_p=self.config.loader["openai"].get("top_p", 0.9),
)
self.llm_semaphore = asyncio.Semaphore(
self.config.loader["openai"].get("concurrency_limit", 20)
)

async def aload_document(
self,
file_path: Union[str, Path],
metadata: Optional[Dict] = None,
save_markdown: bool = False,
) -> Document:
"""Main pipeline: PDF → OCR → Caption → Markdown."""
if metadata is None:
metadata = {}

start_time = time.time()
file_path = str(file_path)

try:
pages = self._pdf_to_images(file_path)
ocr_results = await self._run_ocr_on_pages(pages)
markdown = await self._assemble_markdown(pages, ocr_results)

if save_markdown:
self.save_content(markdown, file_path)

duration = time.time() - start_time
logger.info(f"Processed {file_path} in {duration:.2f}s")
return Document(page_content=markdown, metadata=metadata)

except Exception:
logger.exception("Error in OpenAILoader.aload_document", path=file_path)
raise

def _pdf_to_images(self, pdf_path: str, scale: float = 1.0) -> List[Image.Image]:
pdf = pdfium.PdfDocument(pdf_path)
return [p.render(scale=scale).to_pil() for p in pdf]

async def _run_ocr_on_pages(self, pages: List[Image.Image]) -> List[dict]:
tasks = [self._img2result(img) for img in pages]
return await asyncio.gather(*tasks)

async def _assemble_markdown(
self, pages: List[Image.Image], results: List[dict]
) -> str:
markdown_parts = []
for page_img, page_res in zip(pages, results):
if not page_res:
continue
if self.config["loader"]["image_captioning"]:
await self._caption_images(page_img, page_res)
markdown_parts.append(self._result_to_md(page_res))
return "\n\n".join(markdown_parts).strip()

async def _get_caption(self, img: Image.Image) -> str:
try:
return await self.get_image_description(image_data=img)
except Exception as e:
logger.warning(f"Captioning failed: {e}")
return ""

async def _img2result(self, img: Image.Image, format: str = "PNG") -> dict:
"""Send an image to the OpenAI-compatible OCR model."""
async with self.llm_semaphore:
try:
buffer = io.BytesIO()
img.save(buffer, format=format)
img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")

messages = [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/{format.lower()};base64,{img_b64}"
},
},
{
"type": "text",
"text": f"<|img|><|imgpad|><|endofimg|>{self.PROMPT}",
},
],
}
]

response = await self.llm.ainvoke(messages)
data = json.loads(response.content)
return data

except Exception as e:
logger.error("Error in _img2result", error=str(e))
return {}

@abstractmethod
def _result_to_md(self, result: list[dict]) -> str:
"""Convert structured OCR + caption results to markdown format."""
pass

@abstractmethod
async def _caption_images(self, page_img: Image.Image, page_res: list):
"""Extract picture elements and caption them."""
pass