diff --git a/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_evaluation_callback.py b/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_evaluation_callback.py index 0730a4ce80..b732f18a81 100644 --- a/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_evaluation_callback.py +++ b/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_evaluation_callback.py @@ -15,6 +15,7 @@ import logging import time +import uuid from typing import Any import langsmith @@ -53,11 +54,9 @@ def _humanize_dataset_name(name: str) -> str: def _span_id_to_langsmith_run_id(span_id: int) -> str: """Derive LangSmith run_id from OTEL span_id. - LangSmith deterministically maps OTEL span_ids to run UUIDs: - the first 8 bytes are zeroed, the last 8 bytes are the span_id. + LangSmith deterministically maps OTEL span_ids to run UUIDs. """ - hex_str = format(span_id, "016x") - return f"00000000-0000-0000-{hex_str[:4]}-{hex_str[4:]}" + return str(uuid.UUID(int=span_id)) def _eager_link_run_to_item( @@ -535,9 +534,9 @@ def get_eval_project_name(self) -> str: def on_dataset_loaded(self, *, dataset_name: str, items: list) -> None: self._dataset_name = dataset_name pretty_name = _humanize_dataset_name(dataset_name) - ls_dataset_name = f"Benchmark Dataset ({pretty_name})" + ls_dataset_name = dataset_name try: - ds = self._client.create_dataset(dataset_name=ls_dataset_name, description="NAT eval dataset") + ds = self._client.create_dataset(dataset_name=ls_dataset_name, description=pretty_name) self._dataset_id = str(ds.id) except langsmith.utils.LangSmithConflictError: existing = self._client.read_dataset(dataset_name=ls_dataset_name) diff --git a/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_optimization_callback.py b/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_optimization_callback.py index 35054049da..89b5681463 100644 --- a/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_optimization_callback.py +++ b/packages/nvidia_nat_langchain/src/nat/plugins/langchain/langsmith/langsmith_optimization_callback.py @@ -350,6 +350,17 @@ def _humanize_param_name(param_name: str) -> str: break return _humanize_dataset_name(name) + @staticmethod + def _clean_handle_part(value: str, fallback: str) -> str: + """Clean a prompt handle component for LangSmith prompt repos.""" + slug = re.sub(r"[^a-z0-9_-]+", "-", value.lower()) + slug = re.sub(r"-+", "-", slug).strip("-_") + if not slug: + return fallback + if not slug[0].isalpha(): + return f"{fallback}-{slug}" + return slug + def _get_prompt_repo_name(self, param_name: str) -> str: """Get or create a unique prompt repo name for this optimization run. @@ -365,10 +376,10 @@ def _get_prompt_repo_name(self, param_name: str) -> str: if param_slug.startswith(prefix): param_slug = param_slug[len(prefix):] break - param_slug = param_slug.lower().replace(".", "-").replace("_", "-") + param_slug = self._clean_handle_part(param_slug, fallback="prompt") # Prefix with project name - project_slug = (self._project.lower().replace(" ", "-").replace("_", "-")) + project_slug = self._clean_handle_part(self._project, fallback="project") base = f"{project_slug}-{param_slug}" pattern = re.compile(re.escape(base) + r"-run-(\d+)$") diff --git a/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_callback.py b/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_callback.py index a37c8a5ab2..0fce6615de 100644 --- a/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_callback.py +++ b/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_callback.py @@ -64,6 +64,8 @@ def test_on_dataset_loaded_stores_example_ids(self, eval_cb): def test_on_dataset_loaded_reuses_existing_dataset_and_loads_examples(self, eval_cb): from nat.plugins.langchain.langsmith.langsmith_evaluation_callback import langsmith + + dataset_name = "existing" self.mock_client.create_dataset.side_effect = langsmith.utils.LangSmithConflictError("exists") mock_existing = MagicMock() mock_existing.id = "ds-existing" @@ -74,9 +76,9 @@ def test_on_dataset_loaded_reuses_existing_dataset_and_loads_examples(self, eval mock_ex.inputs = {"nat_item_id": "1", "question": "q"} self.mock_client.list_examples.return_value = [mock_ex] eval_cb.on_dataset_loaded( - dataset_name="existing", + dataset_name=dataset_name, items=[EvalInputItem(id=1, input_obj="q", expected_output_obj="a", full_dataset_entry={})]) - self.mock_client.read_dataset.assert_called_once_with(dataset_name="Benchmark Dataset (Existing)") + self.mock_client.read_dataset.assert_called_once_with(dataset_name=dataset_name) self.mock_client.create_example.assert_not_called() # Should have loaded the existing example ID keyed by nat_item_id assert eval_cb._example_ids["1"] == "ex-existing" @@ -427,6 +429,35 @@ def opt_cb(self): from nat.plugins.langchain.langsmith.langsmith_optimization_callback import LangSmithOptimizationCallback return LangSmithOptimizationCallback(project="test-proj") + @pytest.mark.parametrize( + ("param_name", "expected_repo_name"), + [ + ( + "functions.Agent.prompt.value", + "project-123-project-name_with-spaces-agent-prompt-value-run-1", + ), + ( + "llms.9-NIM.temperature", + "project-123-project-name_with-spaces-prompt-9-nim-temperature-run-1", + ), + ( + "workflow.__", + "project-123-project-name_with-spaces-prompt-run-1", + ), + ( + "custom/path with spaces", + "project-123-project-name_with-spaces-custom-path-with-spaces-run-1", + ), + ], + ) + def test_get_prompt_repo_name_cleans_langsmith_handle(self, opt_cb, param_name, expected_repo_name): + opt_cb._project = "123 Project.Name_With Spaces" + self.mock_client.list_prompts.return_value.repos = [] + + repo_name = opt_cb._get_prompt_repo_name(param_name) + + assert repo_name == expected_repo_name + @patch("nat.plugins.langchain.langsmith.langsmith_evaluation_callback.time.sleep") def test_on_trial_end_links_otel_runs(self, _mock_sleep, opt_cb): # Simulate dataset already created diff --git a/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_integration.py b/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_integration.py index e0ce637df5..daaacec146 100644 --- a/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_integration.py +++ b/packages/nvidia_nat_langchain/tests/langsmith/test_langsmith_integration.py @@ -30,23 +30,88 @@ Tests are skipped by default. Use --run_integration and --run_slow to enable. """ +from __future__ import annotations + import asyncio +import logging +import os import time +import typing +from collections.abc import Generator import pytest +if typing.TYPE_CHECKING: + import uuid + + import langsmith.client + +logger = logging.getLogger(__name__) + + +async def _wait_for_runs( + langsmith_client: langsmith.client.Client, + project_name: str, + expected_count: int, + timeout_s: float = 30.0, +) -> list: + runs = [] + deadline = time.time() + timeout_s + while len(runs) < expected_count and time.time() < deadline: + runs = list(langsmith_client.list_runs(project_name=project_name)) + if len(runs) < expected_count: + await asyncio.sleep(0.1) + return runs + + +async def _wait_for_feedback(langsmith_client: langsmith.client.Client, run_ids, timeout_s: float = 15.0) -> list: + feedback = [] + deadline = time.time() + timeout_s + while not feedback and time.time() < deadline: + for run_id in run_ids: + feedback.extend(langsmith_client.list_feedback(run_ids=[run_id])) + if not feedback: + await asyncio.sleep(0.1) + return feedback + + +@pytest.fixture(name="cleanup_prompts") +def cleanup_prompts_fixture(langsmith_client: langsmith.client.Client) -> Generator[list[str], None, None]: + prompts: list[str] = [] + yield prompts + if os.environ.get("NAT_CI_KEEP_LANGSMITH_PROJECTS") != "1": + for prompt_name in prompts: + try: + langsmith_client.delete_prompt(prompt_name) + except Exception: + logger.exception("Failed to delete prompt %s", prompt_name) + + +@pytest.fixture(name="cleanup_datasets") +def cleanup_datasets_fixture(langsmith_client: langsmith.client.Client) -> Generator[list[uuid.UUID | str], None, None]: + dataset_ids: list[uuid.UUID | str] = [] + yield dataset_ids + if os.environ.get("NAT_CI_KEEP_LANGSMITH_PROJECTS") != "1": + for dataset_id in dataset_ids: + try: + langsmith_client.delete_dataset(dataset_id=dataset_id) + except Exception: + logger.exception("Failed to delete dataset %s", dataset_id) + @pytest.mark.slow @pytest.mark.integration @pytest.mark.usefixtures("langsmith_api_key") async def test_eval_callback_creates_dataset_runs_and_feedback( - langsmith_client, + langsmith_client: langsmith.client.Client, langsmith_project_name: str, + cleanup_datasets: list[uuid.UUID | str], ): """Simulate a nat eval run: dataset + per-item runs + feedback.""" from nat.eval.eval_callbacks import EvalCallbackManager from nat.eval.eval_callbacks import EvalResult from nat.eval.eval_callbacks import EvalResultItem + from nat.eval.evaluator.evaluator_model import EvalInputItem from nat.plugins.langchain.langsmith.langsmith_evaluation_callback import LangSmithEvaluationCallback cb = LangSmithEvaluationCallback( @@ -58,87 +123,115 @@ async def test_eval_callback_creates_dataset_runs_and_feedback( # 1. Load dataset dataset_name = f"integ-test-ds-{time.time()}" + + langsmith_client.create_run('r1', inputs={}, run_type='chain', project_name=langsmith_project_name) + langsmith_client.create_run('r2', inputs={}, run_type='chain', project_name=langsmith_project_name) + + # Wait for runs to appear in LangSmith + runs = await _wait_for_runs(langsmith_client, langsmith_project_name, expected_count=2) + + assert len(runs) >= 2, (f"Expected >= 2 per-item runs, got {len(runs)}") + run_map = {run.name: run for run in runs} + mgr.on_dataset_loaded( dataset_name=dataset_name, items=[ - { - "id": "q1", - "question": "What is 2+2?", - "expected_output": "4", - }, - { - "id": "q2", - "question": "What is 3*3?", - "expected_output": "9", - }, + EvalInputItem( + id="q1", + input_obj="What is 2+2?", + expected_output_obj="4", + full_dataset_entry={}, + ), + EvalInputItem( + id="q2", + input_obj="What is 3*3?", + expected_output_obj="9", + full_dataset_entry={}, + ), ], ) # Verify dataset was created with correct examples - ds = langsmith_client.read_dataset(dataset_name=dataset_name) - assert ds is not None + ds = None + attempts = 0 + last_exception = None + while ds is None and attempts < 5: + try: + ds = langsmith_client.read_dataset(dataset_name=dataset_name) + except Exception as e: + last_exception = e + await asyncio.sleep(0.1) + attempts += 1 + + assert ds is not None, f"Failed to read dataset {dataset_name}: {last_exception}" + cleanup_datasets.append(ds.id) examples = list(langsmith_client.list_examples(dataset_id=ds.id)) assert len(examples) == 2 + assert {example.inputs["nat_item_id"] for example in examples} == {"q1", "q2"} # 2. Complete eval with per-item results - mgr.on_eval_complete( - EvalResult( - metric_scores={"accuracy": 0.9}, - items=[ - EvalResultItem( - item_id="q1", - input_obj="What is 2+2?", - expected_output="4", - actual_output="4", - scores={"accuracy": 1.0}, - reasoning={"accuracy": "Exact match"}, - ), - EvalResultItem( - item_id="q2", - input_obj="What is 3*3?", - expected_output="9", - actual_output="8", - scores={"accuracy": 0.8}, - reasoning={"accuracy": "Close but wrong"}, - ), - ], - )) - - # 3. Wait for runs to appear in LangSmith - runs = [] - deadline = time.time() + 15 - while len(runs) < 2 and time.time() < deadline: - await asyncio.sleep(1) - runs = list(langsmith_client.list_runs(project_name=langsmith_project_name, )) - - assert len(runs) >= 2, (f"Expected >= 2 per-item runs, got {len(runs)}") + eval_result_items = [ + EvalResultItem( + item_id="q1", + input_obj="What is 2+2?", + expected_output="4", + actual_output="4", + scores={"accuracy": 1.0}, + reasoning={"accuracy": "Exact match"}, + root_span_id=run_map["r1"].id.int, + ), + EvalResultItem( + item_id="q2", + input_obj="What is 3*3?", + expected_output="9", + actual_output="8", + scores={"accuracy": 0.8}, + reasoning={"accuracy": "Close but wrong"}, + root_span_id=run_map["r2"].id.int, + ), + ] + mgr.on_eval_complete(EvalResult( + metric_scores={"accuracy": 0.9}, + items=eval_result_items, + )) - # 4. Verify feedback was attached to at least one run - feedback_found = False + # 3. Verify each eval item is linked to the run + result_items_by_run_id = {item.root_span_id: item for item in eval_result_items} for run in runs: - fb = list(langsmith_client.list_feedback(run_ids=[run.id])) - if fb: - feedback_found = True - assert any(f.key == "accuracy" for f in fb) - break - assert feedback_found, "No feedback found on any run" + found_accuracy_feedback = False + feedback = await _wait_for_feedback(langsmith_client, [run.id]) + for feedback_item in feedback: + if feedback_item.key == "accuracy": + result_item = result_items_by_run_id[run.id.int] + expected_score = result_item.scores["accuracy"] + expected_comment = result_item.reasoning["accuracy"] + assert feedback_item.score == expected_score + assert feedback_item.comment == expected_comment - # Cleanup: delete the dataset we created - langsmith_client.delete_dataset(dataset_id=ds.id) + found_accuracy_feedback = True + assert found_accuracy_feedback, f"No accuracy feedback found for run {run.id}" @pytest.mark.slow @pytest.mark.integration @pytest.mark.usefixtures("langsmith_api_key") -async def test_optimizer_callback_creates_trial_runs_and_summary( - langsmith_client, +async def test_optimizer_callback_links_trial_runs_and_feedback( + langsmith_client: langsmith.client.Client, langsmith_project_name: str, + monkeypatch, + cleanup_datasets: list[uuid.UUID | str], ): - """Simulate optimizer trials: trial runs + study summary + feedback.""" + """Simulate optimizer OTEL runs: dataset + per-trial runs + feedback.""" + from nat.eval.eval_callbacks import EvalResult + from nat.eval.eval_callbacks import EvalResultItem + from nat.eval.evaluator.evaluator_model import EvalInputItem + from nat.plugins.langchain.langsmith import langsmith_evaluation_callback from nat.plugins.langchain.langsmith.langsmith_optimization_callback import LangSmithOptimizationCallback from nat.profiler.parameter_optimization.optimizer_callbacks import OptimizerCallbackManager from nat.profiler.parameter_optimization.optimizer_callbacks import TrialResult + monkeypatch.setattr(langsmith_evaluation_callback, "_LS_RETRY_DELAY_S", 0.5) + cb = LangSmithOptimizationCallback( project=langsmith_project_name, experiment_prefix="opt-integ", @@ -146,55 +239,107 @@ async def test_optimizer_callback_creates_trial_runs_and_summary( mgr = OptimizerCallbackManager() mgr.register(cb) - for i in range(2): + trial_project = None + try: + mgr.pre_create_experiment([ + EvalInputItem( + id="q1", + input_obj="What is 2+2?", + expected_output_obj="4", + full_dataset_entry={}, + ), + EvalInputItem( + id="q2", + input_obj="What is 3*3?", + expected_output_obj="9", + full_dataset_entry={}, + ), + ]) + + if cb._dataset_id: + cleanup_datasets.append(cb._dataset_id) + + trial_project = mgr.get_trial_project_name(0) + assert trial_project is not None + + langsmith_client.create_run( + "", + inputs={"input": "What is 2+2?"}, + run_type="chain", + project_name=trial_project, + ) + langsmith_client.create_run( + "", + inputs={"input": "What is 3*3?"}, + run_type="chain", + project_name=trial_project, + ) + + runs = await _wait_for_runs(langsmith_client, trial_project, expected_count=2) + assert len(runs) >= 2, (f"Expected >= 2 trial runs, got {len(runs)}") + run_map = {run.inputs.get("input"): run for run in runs} + mgr.on_trial_end( TrialResult( - trial_number=i, - parameters={"llms.nim.temperature": 0.5 + i * 0.2}, - metric_scores={"accuracy": 0.8 + i * 0.05}, - is_best=(i == 1), + trial_number=0, + parameters={"llms.nim.temperature": 0.7}, + metric_scores={"accuracy": 0.9}, + is_best=True, + eval_result=EvalResult( + metric_scores={"accuracy": 0.9}, + items=[ + EvalResultItem( + item_id="q1", + input_obj="What is 2+2?", + expected_output="4", + actual_output="4", + scores={"accuracy": 1.0}, + reasoning={"accuracy": "Exact match"}, + root_span_id=run_map["What is 2+2?"].id.int, + ), + EvalResultItem( + item_id="q2", + input_obj="What is 3*3?", + expected_output="9", + actual_output="8", + scores={"accuracy": 0.8}, + reasoning={"accuracy": "Close but wrong"}, + root_span_id=run_map["What is 3*3?"].id.int, + ), + ], + ), )) - mgr.on_study_end( - best_trial=TrialResult( - trial_number=1, - parameters={"llms.nim.temperature": 0.7}, - metric_scores={"accuracy": 0.85}, - is_best=True, - ), - total_trials=2, - ) + feedback = await _wait_for_feedback(langsmith_client, [run.id for run in runs]) + assert feedback, "No feedback found on trial runs" + assert any(f.key == "accuracy" for f in feedback) - try: - # Wait for runs to appear: 2 trial runs + 1 summary = 3 - runs = [] - deadline = time.time() + 15 - while len(runs) < 3 and time.time() < deadline: - await asyncio.sleep(1) - runs = list(langsmith_client.list_runs(project_name=langsmith_project_name, )) - - assert len(runs) >= 3, (f"Expected >= 3 runs (2 trials + 1 summary), got {len(runs)}") - - # Verify the summary run exists and has the correct outputs - summary_runs = [r for r in runs if "summary" in (r.name or "")] - assert len(summary_runs) >= 1, ("Expected at least 1 summary run") - assert summary_runs[0].outputs.get("best_trial_number") == 1 + mgr.on_study_end( + best_trial=TrialResult( + trial_number=0, + parameters={"llms.nim.temperature": 0.7}, + metric_scores={"accuracy": 0.9}, + is_best=True, + ), + total_trials=1, + ) finally: - try: - langsmith_client.delete_project(project_name=langsmith_project_name) - except Exception: - pass + if trial_project: + try: + langsmith_client.delete_project(project_name=trial_project) + except Exception: + logger.exception("Failed to delete trial project %s", trial_project) @pytest.mark.slow @pytest.mark.integration @pytest.mark.usefixtures("langsmith_api_key") async def test_optimizer_callback_pushes_prompts( - langsmith_client, + langsmith_client: langsmith.client.Client, langsmith_project_name: str, + cleanup_prompts: list[str], ): - """Simulate a prompt GA trial: prompts in run inputs + pushed to - prompt management.""" + """Simulate a prompt GA trial: prompts are pushed to prompt management.""" from nat.plugins.langchain.langsmith.langsmith_optimization_callback import LangSmithOptimizationCallback from nat.profiler.parameter_optimization.optimizer_callbacks import OptimizerCallbackManager from nat.profiler.parameter_optimization.optimizer_callbacks import TrialResult @@ -217,21 +362,16 @@ async def test_optimizer_callback_pushes_prompts( }, )) - try: - # Wait for the run to appear in LangSmith - runs = [] - deadline = time.time() + 15 - while len(runs) < 1 and time.time() < deadline: - await asyncio.sleep(1) - runs = list(langsmith_client.list_runs(project_name=langsmith_project_name, )) - - assert len(runs) >= 1, (f"Expected >= 1 run, got {len(runs)}") - - # Verify prompts are included in the run inputs - assert "prompts" in runs[0].inputs, ("Expected 'prompts' key in run inputs") - assert "functions.agent.prompt" in runs[0].inputs["prompts"], ("Expected prompt param name in run inputs") - finally: - try: - langsmith_client.delete_project(project_name=langsmith_project_name) - except Exception: - pass + repo_name = cb._prompt_repo_names["functions.agent.prompt"] + cleanup_prompts.append(repo_name) + assert "." not in repo_name + + prompts = [] + deadline = time.time() + 5 + while not prompts and time.time() < deadline: + response = langsmith_client.list_prompts(query=repo_name) + prompts = [p for p in response.repos if p.repo_handle == repo_name] + if not prompts: + await asyncio.sleep(0.1) + + assert prompts, f"Expected prompt repo {repo_name} to exist" diff --git a/packages/nvidia_nat_test/src/nat/test/plugin.py b/packages/nvidia_nat_test/src/nat/test/plugin.py index 75b0699e75..1ed17a3403 100644 --- a/packages/nvidia_nat_test/src/nat/test/plugin.py +++ b/packages/nvidia_nat_test/src/nat/test/plugin.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import random import subprocess @@ -31,10 +32,10 @@ import galileo.log_streams import galileo.projects import langsmith.client - -if typing.TYPE_CHECKING: from docker.client import DockerClient +logger = logging.getLogger(__name__) + def pytest_addoption(parser: pytest.Parser): """ @@ -335,7 +336,11 @@ def langsmith_project_name_fixture(langsmith_client: "langsmith.client.Client", langsmith_client.create_project(project_name) yield project_name - langsmith_client.delete_project(project_name=project_name) + if os.environ.get("NAT_CI_KEEP_LANGSMITH_PROJECTS") != "1": + try: + langsmith_client.delete_project(project_name=project_name) + except Exception: + logger.exception("Failed to delete project %s", project_name) @pytest.fixture(name="galileo_api_key", scope='session')