Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion nemo_retriever/src/nemo_retriever/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
Expand Down Expand Up @@ -59,6 +60,48 @@ def is_vl_rerank_model(model_name: str | None) -> bool:
return (model_name or "") in _VL_RERANK_MODEL_IDS


LOCAL_EMBED_ARCH_ENV = "NRL_LOCAL_EMBED_ARCH"
_VALID_LOCAL_EMBED_ARCHS = frozenset({"vl", "text"})

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this still here? Shouldn't this go away if we accept universal?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like these might be architectures? Please elaborate, on why this distinction would still be required.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this implementation I wasn't planning on accepting universal embedding models for now but rather focusing on either vl or text that nvidia already has on huggingface.

We would need this explicit distinction in the case to avoid routing a VL checkpoint through the text embedder and vice versa.



def _is_local_checkpoint_dir(model_name: str | None) -> bool:
"""Return True if *model_name* points at an on-disk checkpoint directory."""
return bool(model_name) and os.path.isdir(str(model_name))


def _resolve_local_embed_arch(model_arch: str | None) -> bool:
"""Return True (VL) / False (text) for a local checkpoint directory.

The architecture is never inferred. It must be declared explicitly via the
*model_arch* argument or the ``NRL_LOCAL_EMBED_ARCH`` environment variable,
so a local checkpoint can never be silently routed to the wrong embedder.

Raises:
ValueError: when the architecture is unset or not one of ``vl``/``text``.
"""
raw = model_arch if model_arch is not None else os.getenv(LOCAL_EMBED_ARCH_ENV)
arch = (raw or "").strip().lower()
if arch not in _VALID_LOCAL_EMBED_ARCHS:
raise ValueError(
"A local embedding checkpoint directory requires its architecture to be "
f"declared explicitly: set {LOCAL_EMBED_ARCH_ENV}='vl'|'text' (or pass "
f"model_arch) so it routes to the correct embedder. Got {raw!r}."
)
return arch == "vl"
Comment thread
KyleZheng1284 marked this conversation as resolved.


def resolve_embed_model_use_vl(model_name: str | None, *, model_arch: str | None = None) -> bool:
"""Return whether *model_name* should use the VL embedder path.

Registered Hub IDs use the existing VL model allow-list. Local checkpoint
directories do not have a stable Hub ID to match against, so they must
declare their architecture via *model_arch* or ``NRL_LOCAL_EMBED_ARCH``.
"""
if _is_local_checkpoint_dir(model_name):
return _resolve_local_embed_arch(model_arch)
return is_vl_embed_model(model_name)


def create_local_embedder(
model_name: str | None = None,
*,
Expand All @@ -71,6 +114,7 @@ def create_local_embedder(
normalize: bool = True,
max_length: int = 8192,
query_max_length: int = 128,
model_arch: str | None = None,
) -> Any:
"""Create the appropriate local embedding model (VL or non-VL).

Expand All @@ -92,13 +136,20 @@ def create_local_embedder(

Note: ``gpu_memory_utilization``, ``enforce_eager``, ``dimensions``,
``normalize``, and ``max_length`` apply to vLLM paths only; the HF VL path ignores them.

A local checkpoint *directory* (e.g. a fine-tuned drop-in or proxy model)
is supported on both the text and VL paths. Because a directory carries no
registry entry, its architecture (``vl``/``text``) must be declared via
*model_arch* or ``NRL_LOCAL_EMBED_ARCH``; it is never inferred.
"""
b = (backend or "vllm").strip().lower()
if b not in ("vllm", "hf"):
raise ValueError(f"backend must be 'vllm' or 'hf', got {backend!r}")
model_id = resolve_embed_model(model_name)

if is_vl_embed_model(model_name):
use_vl = resolve_embed_model_use_vl(model_name, model_arch=model_arch)

if use_vl:
if b == "hf":
from nemo_retriever.models.local.llama_nemotron_embed_vl_1b_v2_embedder import (
LlamaNemotronEmbedVL1BV2Embedder,
Expand Down Expand Up @@ -181,13 +232,17 @@ def create_local_query_embedder(
normalize: bool = True,
max_length: int = 8192,
query_max_length: int = 128,
model_arch: str | None = None,
) -> Any:
"""Create a local embedder for *query* vectors in retrieval (Retriever / recall).

*backend* must be ``"hf"`` (default) or ``"vllm"``.

- ``backend="hf"``: HuggingFace for both VL and non-VL models.
- ``backend="vllm"``: vLLM for both VL and non-VL models.

*model_arch* (``vl``/``text``) declares the architecture of a local
checkpoint directory; see :func:`create_local_embedder`.
"""
b = normalize_backend(backend, _LOCAL_QUERY_BACKENDS, field_name="backend", default="hf")

Expand All @@ -202,6 +257,7 @@ def create_local_query_embedder(
normalize=normalize,
max_length=int(max_length),
query_max_length=int(query_max_length),
model_arch=model_arch,
)


Expand Down
15 changes: 14 additions & 1 deletion nemo_retriever/src/nemo_retriever/models/hf_model_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@
}


def get_hf_revision(model_id: str, *, strict: bool = True) -> str | None:
def _is_local_model_dir(model_id: str) -> bool:
"""Return whether *model_id* is a local HF model directory."""
return (
bool(model_id) and os.path.isdir(str(model_id)) and os.path.isfile(os.path.join(str(model_id), "config.json"))
)


def get_hf_revision(model_id: str, *, strict: bool = True, allow_local_path: bool = False) -> str | None:
"""Return the pinned commit SHA for *model_id*.

Parameters
Expand All @@ -59,11 +66,17 @@ def get_hf_revision(model_id: str, *, strict: bool = True) -> str | None:
no pinned revision. When ``False``, log a warning and return
``None`` so that ``from_pretrained`` falls back to the ``main``
branch.
allow_local_path:
When ``True``, a local model directory containing ``config.json`` is
allowed to return ``None`` because it has no Hub commit SHA to pin.
"""
revision = HF_MODEL_REVISIONS.get(model_id)
if revision is not None:
return revision

if allow_local_path and _is_local_model_dir(model_id):
return None

msg = (
f"No pinned HuggingFace revision for model '{model_id}'. "
"Add an entry to HF_MODEL_REVISIONS in hf_model_registry.py to pin it."
Expand Down
12 changes: 10 additions & 2 deletions nemo_retriever/src/nemo_retriever/models/inference/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def maybe_inject_local_hf_embedder(task_config: Dict[str, Any], transform_config
if has_endpoint or not use_local:
return

from nemo_retriever.models import create_local_embedder, resolve_embed_model, is_vl_embed_model
from nemo_retriever.model import (
create_local_embedder,
resolve_embed_model,
resolve_embed_model_use_vl,
)
Comment on lines +93 to +97

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 The import uses nemo_retriever.model (singular), but the package is nemo_retriever.models (plural). There is no nemo_retriever/model.py module in the repo, so this raises ModuleNotFoundError at runtime on every call to maybe_inject_local_hf_embedder when no remote endpoint is configured — i.e., the exact case this function is meant to handle.

Suggested change
from nemo_retriever.model import (
create_local_embedder,
resolve_embed_model,
resolve_embed_model_use_vl,
)
from nemo_retriever.models import (
create_local_embedder,
resolve_embed_model,
resolve_embed_model_use_vl,
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/models/inference/processor.py
Line: 93-97

Comment:
The import uses `nemo_retriever.model` (singular), but the package is `nemo_retriever.models` (plural). There is no `nemo_retriever/model.py` module in the repo, so this raises `ModuleNotFoundError` at runtime on every call to `maybe_inject_local_hf_embedder` when no remote endpoint is configured — i.e., the exact case this function is meant to handle.

```suggestion
    from nemo_retriever.models import (
        create_local_embedder,
        resolve_embed_model,
        resolve_embed_model_use_vl,
    )
```

How can I resolve this? If you propose a fix, please make it concise.


embed_model = resolve_embed_model(
task_config.get("embed_model_name")
Expand All @@ -103,6 +107,7 @@ def maybe_inject_local_hf_embedder(task_config: Dict[str, Any], transform_config

ingest_backend = (task_config.get("local_ingest_embed_backend") or "vllm").strip().lower()

model_arch = task_config.get("embed_model_arch")
embedder_instance = create_local_embedder(
embed_model,
backend=ingest_backend,
Expand All @@ -112,11 +117,14 @@ def maybe_inject_local_hf_embedder(task_config: Dict[str, Any], transform_config
enforce_eager=_to_bool(task_config.get("enforce_eager"), default=False),
dimensions=task_config.get("dimensions"),
query_max_length=int(task_config.get("query_max_length", 128)),
model_arch=model_arch,
)

prefix = f"{transform_config.input_type}: " if getattr(transform_config, "input_type", None) else ""

if is_vl_embed_model(embed_model):
use_vl = resolve_embed_model_use_vl(embed_model, model_arch=model_arch)

if use_vl:

def _embed(texts):
vecs = embedder_instance.embed(texts, batch_size=local_batch_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _ensure_loaded(self) -> None:
max_model_len = int(self.max_length) if int(self.max_length) > 0 else None
self._llm = create_vllm_llm(
str(model_id),
revision=get_hf_revision(model_id),
revision=get_hf_revision(model_id, allow_local_path=True),
dimensions=self.dimensions,
gpu_memory_utilization=self.gpu_memory_utilization,
enforce_eager=self.enforce_eager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _ensure_loaded(self) -> None:
model_id = self.model_id or _DEFAULT_EMBED_MODEL
dev = torch.device(self.device or ("cuda" if torch.cuda.is_available() else "cpu"))
hf_cache_dir = configure_global_hf_cache_base(self.hf_cache_dir)
_revision = get_hf_revision(model_id)
_revision = get_hf_revision(model_id, allow_local_path=True)
self._tokenizer = AutoTokenizer.from_pretrained(
model_id,
revision=_revision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _ensure_loaded(self) -> None:
# device_map when requesting it. Fall back to sdpa/eager on CPU or
# when flash-attn is not installed.
use_gpu = dev.type == "cuda"
_revision = get_hf_revision(model_id)
_revision = get_hf_revision(model_id, allow_local_path=True)
for attn_impl in ("flash_attention_2", "sdpa", "eager"):
try:
kwargs: dict[str, Any] = {
Expand Down Expand Up @@ -234,7 +234,7 @@ def _ensure_loaded(self) -> None:
model_id = self.model_id or "nvidia/llama-nemotron-embed-vl-1b-v2"
self._llm = create_vllm_llm(
str(model_id),
revision=get_hf_revision(model_id),
revision=get_hf_revision(model_id, allow_local_path=True),
gpu_memory_utilization=self.gpu_memory_utilization,
enforce_eager=self.enforce_eager,
limit_mm_per_prompt={"image": 1},
Expand Down
Loading
Loading