diff --git a/README.md b/README.md index cb1366d728..dbcf40616f 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,8 @@ The easiest way to get started is to use one of our integrations. Opik supports: | OpenAI | Log traces for all OpenAI LLM calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/openai/?utm_source=opik&utm_medium=github&utm_content=openai_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb) | | LiteLLM | Call any LLM model using the OpenAI format | [Documentation](/tracing/integrations/litellm.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/litellm.ipynb) | | LangChain | Log traces for all LangChain LLM calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/langchain/?utm_source=opik&utm_medium=github&utm_content=langchain_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb) | +| Haystack | Log traces for all Haystack calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/haystack/?utm_source=opik&utm_medium=github&utm_content=haystack_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/haystack.ipynb) | + | Bedrock | Log traces for all Bedrock LLM calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/bedrock?utm_source=opik&utm_medium=github&utm_content=bedrock_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/bedrock.ipynb) | | Anthropic | Log traces for all Anthropic LLM calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/anthropic?utm_source=opik&utm_medium=github&utm_content=anthropic_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/anthropic.ipynb) | | Gemini | Log traces for all Gemini LLM calls | [Documentation](https://www.comet.com/docs/opik/tracing/integrations/gemini?utm_source=opik&utm_medium=github&utm_content=gemini_link&utm_campaign=opik) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/gemini.ipynb) | diff --git a/apps/opik-documentation/documentation/docs/cookbook/haystack.ipynb b/apps/opik-documentation/documentation/docs/cookbook/haystack.ipynb new file mode 100644 index 0000000000..73cfd2fe8d --- /dev/null +++ b/apps/opik-documentation/documentation/docs/cookbook/haystack.ipynb @@ -0,0 +1,231 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Opik with Haystack\n", + "\n", + "[Haystack](https://docs.haystack.deepset.ai/docs/intro) is an open-source framework for building production-ready LLM applications, retrieval-augmented generative pipelines and state-of-the-art search systems that work intelligently over large document collections.\n", + "\n", + "In this guide, we will showcase how to integrate Opik with Haystack so that all the Haystack calls are logged as traces in Opik." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating an account on Comet.com\n", + "\n", + "[Comet](https://www.comet.com/site?from=llm&utm_source=opik&utm_medium=colab&utm_content=haystack&utm_campaign=opik) provides a hosted version of the Opik platform, [simply create an account](https://www.comet.com/signup?from=llm&utm_source=opik&utm_medium=colab&utm_content=haystack&utm_campaign=opik) and grab you API Key.\n", + "\n", + "> You can also run the Opik platform locally, see the [installation guide](https://www.comet.com/docs/opik/self-host/overview/?from=llm&utm_source=opik&utm_medium=colab&utm_content=haystack&utm_campaign=opik) for more information." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install --upgrade --quiet opik haystack-ai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import opik\n", + "\n", + "opik.configure(use_local=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import getpass\n", + "\n", + "if \"OPENAI_API_KEY\" not in os.environ:\n", + " os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI API key: \")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating the Haystack pipeline\n", + "\n", + "In this example, we will create a simple pipeline that uses a prompt template to translate text to German.\n", + "\n", + "To enable Opik tracing, we will:\n", + "1. Enable content tracing in Haystack by setting the environment variable `HAYSTACK_CONTENT_TRACING_ENABLED=true`\n", + "2. Add the `OpikConnector` component to the pipeline\n", + "\n", + "Note: The `OpikConnector` component is a special component that will automatically log the traces of the pipeline as Opik traces, it should not be connected to any other component." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "OPIK: Traces will not be logged to Opik because Haystack tracing is disabled. To enable, set the HAYSTACK_CONTENT_TRACING_ENABLED environment variable to true before importing Haystack.\n", + "OPIK: Started logging traces to the \"Default Project\" project at https://www.comet.com/opik/jacques-comet/redirect/projects?name=Default%20Project.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Trace ID: 0675969a-fd40-7fcb-8000-0287dec9559f\n", + "ChatMessage(content='Berlin ist die Hauptstadt Deutschlands und zugleich eine der aufregendsten und vielfältigsten Städte Europas. Die Metropole hat eine reiche Geschichte, die sich in ihren historischen Gebäuden, Museen und Denkmälern widerspiegelt. Berlin ist auch bekannt für seine lebendige Kunst- und Kulturszene, mit unzähligen Galerien, Theatern und Musikveranstaltungen.\\n\\nDie Stadt ist zudem ein Schmelztiegel der Kulturen, was sich in der vielfältigen Gastronomie, den lebhaften Märkten und den multikulturellen Vierteln widerspiegelt. Berlin bietet auch eine lebendige Nightlife-Szene, mit zahlreichen Bars, Clubs und Veranstaltungen für jeden Geschmack.\\n\\nNeben all dem kulturellen Reichtum hat Berlin auch eine grüne Seite, mit vielen Parks, Gärten und Seen, die zum Entspannen und Erholen einladen. Insgesamt ist Berlin eine Stadt, die für jeden etwas zu bieten hat und die Besucher mit ihrer Vielfalt und Offenheit begeistert.', role=, name=None, meta={'model': 'gpt-3.5-turbo-0125', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 255, 'prompt_tokens': 29, 'total_tokens': 284, 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})\n" + ] + } + ], + "source": [ + "import os\n", + "\n", + "os.environ[\"HAYSTACK_CONTENT_TRACING_ENABLED\"] = \"true\"\n", + "\n", + "from haystack import Pipeline\n", + "from haystack.components.builders import ChatPromptBuilder\n", + "from haystack.components.generators.chat import OpenAIChatGenerator\n", + "from haystack.dataclasses import ChatMessage\n", + "\n", + "from opik.integrations.haystack import OpikConnector\n", + "\n", + "\n", + "pipe = Pipeline()\n", + "\n", + "# Add the OpikConnector component to the pipeline\n", + "pipe.add_component(\"tracer\", OpikConnector(\"Chat example\"))\n", + "\n", + "# Continue building the pipeline\n", + "pipe.add_component(\"prompt_builder\", ChatPromptBuilder())\n", + "pipe.add_component(\"llm\", OpenAIChatGenerator(model=\"gpt-3.5-turbo\"))\n", + "\n", + "pipe.connect(\"prompt_builder.prompt\", \"llm.messages\")\n", + "\n", + "messages = [\n", + " ChatMessage.from_system(\n", + " \"Always respond in German even if some input data is in other languages.\"\n", + " ),\n", + " ChatMessage.from_user(\"Tell me about {{location}}\"),\n", + "]\n", + "\n", + "response = pipe.run(\n", + " data={\n", + " \"prompt_builder\": {\n", + " \"template_variables\": {\"location\": \"Berlin\"},\n", + " \"template\": messages,\n", + " }\n", + " }\n", + ")\n", + "\n", + "trace_id = response[\"tracer\"][\"trace_id\"]\n", + "print(f\"Trace ID: {trace_id}\")\n", + "print(response[\"llm\"][\"replies\"][0])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The trace is now logged to the Opik platform:\n", + "\n", + "![Haystack trace](https://raw.githubusercontent.com/comet-ml/opik/main/apps/opik-documentation/documentation/static/img/cookbook/haystack_trace_cookbook.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Advanced usage\n", + "\n", + "### Ensuring the trace is logged\n", + "\n", + "By default the `OpikConnector` will flush the trace to the Opik platform after each component in a thread blocking way. As a result, you may disable flushing the data after each component by setting the `HAYSTACK_OPIK_ENFORCE_FLUSH` environent variable to `false`.\n", + "\n", + "**Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Opik. Make sure you will call the `flush()` method explicitly before the program exits:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from haystack.tracing import tracer\n", + "\n", + "tracer.actual_tracer.flush()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Getting the trace ID\n", + "\n", + "If you would like to log additional information to the trace you will need to get the trace ID. You can do this by the `tracer` key in the response of the pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Trace ID: 067596ab-da00-7c1f-8000-53f7af5fc3de\n" + ] + } + ], + "source": [ + "response = pipe.run(\n", + " data={\n", + " \"prompt_builder\": {\n", + " \"template_variables\": {\"location\": \"Berlin\"},\n", + " \"template\": messages,\n", + " }\n", + " }\n", + ")\n", + "\n", + "trace_id = response[\"tracer\"][\"trace_id\"]\n", + "print(f\"Trace ID: {trace_id}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py312_llm_eval", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb b/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb index b993482d99..d87979ea2e 100644 --- a/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb +++ b/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb @@ -64,8 +64,6 @@ "import os\n", "import getpass\n", "\n", - "os.environ[\"OPIK_WORKSPACE\"] = \"opik-cookbooks\"\n", - "\n", "if \"OPENAI_API_KEY\" not in os.environ:\n", " os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI API key: \")" ] diff --git a/apps/opik-documentation/documentation/docs/tracing/integrations/haystack.md b/apps/opik-documentation/documentation/docs/tracing/integrations/haystack.md new file mode 100644 index 0000000000..f0808e34f2 --- /dev/null +++ b/apps/opik-documentation/documentation/docs/tracing/integrations/haystack.md @@ -0,0 +1,130 @@ +--- +sidebar_label: Haystack +--- + +# Haystack + +[Haystack](https://docs.haystack.deepset.ai/docs/intro) is an open-source framework for building production-ready LLM applications, retrieval-augmented generative pipelines and state-of-the-art search systems that work intelligently over large document collections. + +Opik integrates with Haystack to log traces for all Haystack pipelines. + +## Getting started + +First, ensure you have both `opik` and `haystack-ai` installed: + +```bash +pip install opik haystack-ai +``` + +In addition, you can configure Opik using the `opik configure` command which will prompt you for the correct local server address or if you are using the Cloud platfrom your API key: + +```bash +opik configure +``` + +## Logging Haystack pipeline runs + +To log a Haystack pipeline run, you can use the [`OpikConnector`](https://www.comet.com/docs/opik/python-sdk-reference/integrations/haystack/OpikConnector.html). This connector will log the pipeline run to the Opik platform and add a `tracer` key to the pipeline run response with the trace ID: + +```python +import os + +os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + +from haystack import Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.dataclasses import ChatMessage + +from opik.integrations.haystack import OpikConnector + + +pipe = Pipeline() + +# Add the OpikConnector component to the pipeline +pipe.add_component( + "tracer", OpikConnector("Chat example") +) + +# Continue building the pipeline +pipe.add_component("prompt_builder", ChatPromptBuilder()) +pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) + +pipe.connect("prompt_builder.prompt", "llm.messages") + +messages = [ + ChatMessage.from_system( + "Always respond in German even if some input data is in other languages." + ), + ChatMessage.from_user("Tell me about {{location}}"), +] + +response = pipe.run( + data={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + } +) + +print(response["llm"]["replies"][0]) +``` + +Each pipeline run will now be logged to the Opik platform: + +![Haystack](/img/cookbook/haystack_trace_cookbook.png) + +:::tip + +In order to ensure the traces are correctly logged, make sure you set the environment variable `HAYSTACK_CONTENT_TRACING_ENABLED` to `true` before running the pipeline. + +::: + +## Advanced usage + +### Disabling automatic flushing of traces + +By default the `OpikConnector` will flush the trace to the Opik platform after each component in a thread blocking way. As a result, you may want to disable flushing the data after each component by setting the `HAYSTACK_OPIK_ENFORCE_FLUSH` environent variable to `false`. + +In order to make sure that all traces are logged to the Opik platform before you exit a script, you can use the `flush` method: + +```python +from haystack.tracing import tracer + +# Pipeline definition + +tracer.actual_tracer.flush() +``` + +:::warning + +Disabling this feature may result in data loss if the program crashes before the data is sent to Opik. Make sure you will call the `flush()` method explicitly before the program exits. + +::: + +### Updating logged traces + +The `OpikConnector` returns the logged trace ID in the pipeline run response. You can use this ID to update the trace with feedback scores or other metadata: + +```python +import opik + +response = pipe.run( + data={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + } +) + +# Get the trace ID from the pipeline run response +trace_id = response["tracer"]["trace_id"] + +# Log the feedback score +opik_client = opik.Opik() +opik_client.log_traces_feedback_scores([ + {"id": trace_id, "name": "user-feedback", "value": 0.5} +]) +``` diff --git a/apps/opik-documentation/documentation/docs/tracing/integrations/overview.md b/apps/opik-documentation/documentation/docs/tracing/integrations/overview.md index 822b2d8516..f98376aba5 100644 --- a/apps/opik-documentation/documentation/docs/tracing/integrations/overview.md +++ b/apps/opik-documentation/documentation/docs/tracing/integrations/overview.md @@ -12,6 +12,7 @@ Opik aims to make it as easy as possible to log, view and evaluate your LLM trac | OpenAI | Log traces for all OpenAI LLM calls | [Documentation](/tracing/integrations/openai.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb) | | LiteLLM | Call any LLM model using the OpenAI format | [Documentation](/tracing/integrations/litellm.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/litellm.ipynb) | | LangChain | Log traces for all LangChain LLM calls | [Documentation](/tracing/integrations/langchain.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/langchain.ipynb) | +| Haystack | Log traces for all Haystack pipelines | [Documentation](/tracing/integrations/haystack.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/haystack.ipynb) | | aisuite | Log traces for all aisuite LLM calls | [Documentation](/tracing/integrations/aisuite.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/aisuite.ipynb) | | Anthropic | Log traces for all Anthropic LLM calls | [Documentation](/tracing/integrations/anthropic.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/anthropic.ipynb) | | Bedrock | Log traces for all AWS Bedrock LLM calls | [Documentation](/tracing/integrations/bedrock.md) | [![Open Quickstart In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/comet-ml/opik/blob/master/apps/opik-documentation/documentation/docs/cookbook/bedrock.ipynb) | diff --git a/apps/opik-documentation/documentation/sidebars.ts b/apps/opik-documentation/documentation/sidebars.ts index a64d9396f6..08adbbb6ad 100644 --- a/apps/opik-documentation/documentation/sidebars.ts +++ b/apps/opik-documentation/documentation/sidebars.ts @@ -59,6 +59,7 @@ const sidebars: SidebarsConfig = { "tracing/integrations/dify", "tracing/integrations/gemini", "tracing/integrations/groq", + "tracing/integrations/haystack", "tracing/integrations/langgraph", "tracing/integrations/llama_index", "tracing/integrations/ollama", @@ -123,6 +124,7 @@ const sidebars: SidebarsConfig = { "cookbook/bedrock", "cookbook/gemini", "cookbook/groq", + "cookbook/haystack", "cookbook/langgraph", "cookbook/llama-index", "cookbook/ollama", diff --git a/apps/opik-documentation/documentation/static/img/cookbook/haystack_trace_cookbook.png b/apps/opik-documentation/documentation/static/img/cookbook/haystack_trace_cookbook.png new file mode 100644 index 0000000000..e9ee6c3c91 Binary files /dev/null and b/apps/opik-documentation/documentation/static/img/cookbook/haystack_trace_cookbook.png differ diff --git a/apps/opik-documentation/python-sdk-docs/source/index.rst b/apps/opik-documentation/python-sdk-docs/source/index.rst index df38d11a03..9c07d8d9ad 100644 --- a/apps/opik-documentation/python-sdk-docs/source/index.rst +++ b/apps/opik-documentation/python-sdk-docs/source/index.rst @@ -168,6 +168,7 @@ You can learn more about the `opik` python SDK in the following sections: integrations/openai/index integrations/anthropic/index integrations/langchain/index + integrations/haystack/index integrations/bedrock/index integrations/llama_index/index diff --git a/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/OpikConnector.rst b/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/OpikConnector.rst new file mode 100644 index 0000000000..3ee89410cd --- /dev/null +++ b/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/OpikConnector.rst @@ -0,0 +1,5 @@ +OpikConnector +============= + +.. autoclass:: opik.integrations.haystack.OpikConnector + :members: diff --git a/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/index.rst b/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/index.rst new file mode 100644 index 0000000000..261da805cc --- /dev/null +++ b/apps/opik-documentation/python-sdk-docs/source/integrations/haystack/index.rst @@ -0,0 +1,53 @@ +Haystack +======== + +Opik integrates with Haystack to allow you to log your Haystack pipeline runs to the Opik platform, simply wrap the Haystack pipeline with `OpikConnector` to start logging:: + + import os + + os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + + from haystack import Pipeline + from haystack.components.builders import ChatPromptBuilder + from haystack.components.generators.chat import OpenAIChatGenerator + from haystack.dataclasses import ChatMessage + + from opik.integrations.haystack import OpikConnector + + + pipe = Pipeline() + + # Add the OpikConnector component to the pipeline + pipe.add_component( + "tracer", OpikConnector("Chat example") + ) + + # Continue building the pipeline + pipe.add_component("prompt_builder", ChatPromptBuilder()) + pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) + + pipe.connect("prompt_builder.prompt", "llm.messages") + + messages = [ + ChatMessage.from_system( + "Always respond in German even if some input data is in other languages." + ), + ChatMessage.from_user("Tell me about {{location}}"), + ] + + response = pipe.run( + data={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + } + ) + +You can learn more about the `OpikConnector` in the following section: + +.. toctree:: + :maxdepth: 4 + :titlesonly: + + OpikConnector diff --git a/sdks/python/src/opik/integrations/haystack/__init__.py b/sdks/python/src/opik/integrations/haystack/__init__.py new file mode 100644 index 0000000000..e829d45f66 --- /dev/null +++ b/sdks/python/src/opik/integrations/haystack/__init__.py @@ -0,0 +1,3 @@ +from .opik_connector import OpikConnector + +__all__ = ["OpikConnector"] diff --git a/sdks/python/src/opik/integrations/haystack/opik_connector.py b/sdks/python/src/opik/integrations/haystack/opik_connector.py new file mode 100644 index 0000000000..cae6083dda --- /dev/null +++ b/sdks/python/src/opik/integrations/haystack/opik_connector.py @@ -0,0 +1,125 @@ +from typing import Any, Dict, Optional + +import haystack +from haystack import logging, tracing + +from . import opik_tracer +import opik + +LOGGER = logging.getLogger(__name__) + + +@haystack.component +class OpikConnector: + """ + OpikConnector connects Haystack LLM framework with [Opik](https://github.com/comet-ml/opik) in order to enable the + tracing of operations and data flow within various components of a pipeline. + + Simply add this component to your pipeline, but *do not* connect it to any other component. The OpikConnector + will automatically trace the operations and data flow within the pipeline. + + In order to configure Opik, you will need to call first install the Opik SDK using `pip install opik` and then + run `opik configure` from the command line. Alternatively you can configure Opik using environment variables, + you can find more information about how to configure Opik [here](https://www.comet.com/docs/opik/tracing/sdk_configuration). + + In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to + enable Haystack tracing in your pipeline. + + Example: + You can use the `OpikConnector` in the following way:: + + import os + + os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + + from haystack import Pipeline + from haystack.components.builders import ChatPromptBuilder + from haystack.components.generators.chat import OpenAIChatGenerator + from haystack.dataclasses import ChatMessage + from opik.integrations.haystack import ( + OpikConnector, + ) + + if __name__ == "__main__": + pipe = Pipeline() + pipe.add_component("tracer", OpikConnector("Chat example")) + pipe.add_component("prompt_builder", ChatPromptBuilder()) + pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) + + pipe.connect("prompt_builder.prompt", "llm.messages") + + messages = [ + ChatMessage.from_system( + "Always respond in German even if some input data is in other languages." + ), + ChatMessage.from_user("Tell me about {{location}}"), + ] + + response = pipe.run( + data={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + } + ) + print(response["llm"]["replies"][0]) + print(response["tracer"]["trace_url"]) + + Note: + + You may disable flushing the data after each component by setting the `HAYSTACK_OPIK_ENFORCE_FLUSH` + environent variable to `false`. By default, the data is flushed after each component and blocks the thread until + the data is sent to Opik. **Caution**: Disabling this feature may result in data loss if the program crashes + before the data is sent to Opik. Make sure you will call the `flush()` method explicitly before the program exits. + E.g. by using tracer.actual_tracer.flush():: + + from haystack.tracing import tracer + + tracer.actual_tracer.flush() + """ + + def __init__(self, name: str, project_name: Optional[str] = None): + """ + Initialize the OpikConnector component. + + Args: + name: The name of the pipeline or component. This name will be used to identify the tracing run on the + Opik dashboard. + project_name: The name of the project to use for the tracing run. If not provided, the project name will be + set to the default project name. + """ + self.name = name + self.tracer = opik_tracer.OpikTracer( + opik_client=opik.Opik(project_name=project_name), name=name + ) + tracing.enable_tracing(self.tracer) + + @haystack.component.output_types(name=str, trace_id=Optional[str], project_url=str) + def run( + self, invocation_context: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Runs the OpikConnector component. + + Args: + invocation_context: A dictionary with additional context for the invocation. This parameter + is useful when users want to mark this particular invocation with additional information, e.g. + a run id from their own execution framework, user id, etc. These key-value pairs are then visible + in the Opik traces. + + Returns: + A dictionary with the following keys: + - `name`: The name of the tracing component. + - `trace_id`: The Opik trace id. + - `project_url`: The URL to the Opik project with tracing data. + """ + LOGGER.debug( + f"Opik tracer invoked with the following context: {invocation_context}" + ) + + return { + "name": self.name, + "trace_id": self.tracer.get_trace_id(), + "project_url": self.tracer.get_project_url(), + } diff --git a/sdks/python/src/opik/integrations/haystack/opik_tracer.py b/sdks/python/src/opik/integrations/haystack/opik_tracer.py new file mode 100644 index 0000000000..6743036e34 --- /dev/null +++ b/sdks/python/src/opik/integrations/haystack/opik_tracer.py @@ -0,0 +1,273 @@ +import contextlib +import os +import contextvars +from typing import Any, Dict, Iterator, List, Optional, Union + +from haystack import logging +from haystack.components.generators import openai_utils +from haystack import dataclasses as haystack_dataclasses +from haystack import tracing +from haystack.tracing import utils as tracing_utils + +import opik +from opik.api_objects import span as opik_span +from opik.api_objects import trace as opik_trace + +logger = logging.getLogger(__name__) + +HAYSTACK_OPIK_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_OPIK_ENFORCE_FLUSH" +_SUPPORTED_GENERATORS = [ + "AzureOpenAIGenerator", + "OpenAIGenerator", + "AnthropicGenerator", + "HuggingFaceAPIGenerator", + "HuggingFaceLocalGenerator", + "CohereGenerator", +] +_SUPPORTED_CHAT_GENERATORS = [ + "AzureOpenAIChatGenerator", + "OpenAIChatGenerator", + "AnthropicChatGenerator", + "HuggingFaceAPIChatGenerator", + "HuggingFaceLocalChatGenerator", + "CohereChatGenerator", +] +_ALL_SUPPORTED_GENERATORS = _SUPPORTED_GENERATORS + _SUPPORTED_CHAT_GENERATORS + + +_PIPELINE_RUN_KEY = "haystack.pipeline.run" +_PIPELINE_INPUT_DATA_KEY = "haystack.pipeline.input_data" +_PIPELINE_OUTPUT_DATA_KEY = "haystack.pipeline.output_data" +_COMPONENT_NAME_KEY = "haystack.component.name" +_COMPONENT_TYPE_KEY = "haystack.component.type" +_COMPONENT_OUTPUT_KEY = "haystack.component.output" + +# Context var used to keep track of tracing related info. +# This mainly useful for parents spans. +tracing_context_var: contextvars.ContextVar[Dict[Any, Any]] = contextvars.ContextVar( + "tracing_context" +) + + +class OpikSpanBridge(tracing.Span): + """ + Internal class representing a bridge between the Haystack span tracing API and Opik. + """ + + def __init__(self, span: Union[opik_span.Span, opik_trace.Trace]) -> None: + """ + Initialize a OpikSpan instance. + + Args: + span: The span instance managed by Opik. + """ + self._span = span + # locally cache tags + self._data: Dict[str, Any] = {} + + def set_tag(self, key: str, value: Any) -> None: + """ + Set a generic tag for this span. + + Args: + key: The tag key. + value: The tag value. + """ + coerced_value = tracing_utils.coerce_tag_value(value) + self._span.update(metadata={key: coerced_value}) + self._data[key] = value + + def set_content_tag(self, key: str, value: Any) -> None: + """ + Set a content-specific tag for this span. + + Args: + key: The content tag key. + value: The content tag value. + """ + if not tracing.tracer.is_content_tracing_enabled: + return + if key.endswith(".input"): + if "messages" in value: + messages = [ + openai_utils._convert_message_to_openai_format(m) + for m in value["messages"] + ] + self._span.update(input={"input": messages}) + else: + self._span.update(input={"input": value}) + elif key.endswith(".output"): + if "replies" in value: + if all( + isinstance(r, haystack_dataclasses.ChatMessage) + for r in value["replies"] + ): + replies = [ + openai_utils._convert_message_to_openai_format(m) + for m in value["replies"] + ] + else: + replies = value["replies"] + self._span.update(output={"replies": replies}) + else: + self._span.update(output={"output": value}) + + self._data[key] = value + + def raw_span(self) -> Union[opik_span.Span, opik_trace.Trace]: + """ + Return the underlying span instance. + + :return: The Opik span instance. + """ + return self._span + + def get_correlation_data_for_logs(self) -> Dict[str, Any]: + return {} + + +class OpikTracer(tracing.Tracer): + """ + Internal class representing a bridge between the Haystack tracer and Opik. + """ + + def __init__(self, opik_client: opik.Opik, name: str = "Haystack") -> None: + """ + Initialize a OpikTracer instance. + + Args: + tracer: The Opik tracer instance. + name: The name of the pipeline or component. This name will be used to identify the tracing run on the + Opik dashboard. + """ + if not tracing.tracer.is_content_tracing_enabled: + logger.warning( + "Traces will not be logged to Opik because Haystack tracing is disabled. " + "To enable, set the HAYSTACK_CONTENT_TRACING_ENABLED environment variable to true " + "before importing Haystack." + ) + self._opik_client = opik_client + self._context: List[OpikSpanBridge] = [] + self._name = name + self.enforce_flush = ( + os.getenv(HAYSTACK_OPIK_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true" + ) + + @contextlib.contextmanager + def trace( + self, + operation_name: str, + tags: Optional[Dict[str, Any]] = None, + parent_span: Optional[OpikSpanBridge] = None, + ) -> Iterator[tracing.Span]: + tags = tags or {} + span_name = tags.get(_COMPONENT_NAME_KEY, operation_name) + + # Create new span depending whether there's a parent span or not + if not parent_span: + if operation_name != _PIPELINE_RUN_KEY: + logger.warning( + "Creating a new trace without a parent span is not recommended for operation '{operation_name}'.", + operation_name=operation_name, + ) + # Create a new trace if no parent span is provided + context = tracing_context_var.get({}) + + trace = self._opik_client.trace( + id=context.get("trace_id"), + name=self._name, + tags=context.get("tags"), + ) + span = OpikSpanBridge(trace) + elif tags.get(_COMPONENT_TYPE_KEY) in _ALL_SUPPORTED_GENERATORS: + span = OpikSpanBridge( + parent_span.raw_span().span(name=span_name, type="llm") + ) + else: + span = OpikSpanBridge(parent_span.raw_span().span(name=span_name)) + + self._context.append(span) + span.set_tags(tags) + + yield span + + raw_span = span.raw_span() + # Update span metadata based on component type + if tags.get(_COMPONENT_TYPE_KEY) in _SUPPORTED_GENERATORS: + # Haystack returns one meta dict for each message, but the 'usage' value + # is always the same, let's just pick the first item + meta = span._data.get(_COMPONENT_OUTPUT_KEY, {}).get("meta") + if meta: + m = meta[0] + if isinstance(raw_span, opik.Span): + raw_span.update(usage=m.get("usage") or None, model=m.get("model")) + elif tags.get(_COMPONENT_TYPE_KEY) in _SUPPORTED_CHAT_GENERATORS: + replies = span._data.get(_COMPONENT_OUTPUT_KEY, {}).get("replies") + if replies: + meta = replies[0].meta + if isinstance(raw_span, opik.Span): + raw_span.update( + usage=meta.get("usage") or None, + model=meta.get("model"), + ) + + elif tags.get(_PIPELINE_INPUT_DATA_KEY) is not None: + input_data = tags.get("haystack.pipeline.input_data", {}) + output_data = tags.get("haystack.pipeline.output_data", {}) + + raw_span.update( + input=input_data, + output=output_data, + ) + + raw_span.end() + self._context.pop() + + if self.enforce_flush: + self.flush() + + def flush(self) -> None: + self._opik_client.flush() + + def current_span(self) -> Optional[OpikSpanBridge]: + """ + Return the current active span. + + Returns: + The current span if available, else None. + """ + return self._context[-1] if self._context else None + + def get_project_url(self) -> str: + """ + Return the URL to the tracing data. + + Returns: + The URL to the project that includes the tracing data. + """ + last_span = self.current_span() + + if last_span is None: + return "" + else: + project_name = last_span.raw_span()._project_name + return self._opik_client.get_project_url(project_name) + + def get_trace_id(self) -> Optional[str]: + """ + Return the trace id of the current trace. + + Returns: + The id of the current trace. + """ + last_span = self.current_span() + + if last_span is None: + return None + + raw_span = last_span.raw_span() + + if isinstance(raw_span, opik_trace.Trace): + return raw_span.id + + return raw_span.trace_id diff --git a/sdks/python/tests/conftest.py b/sdks/python/tests/conftest.py index 8af9b7d173..8aeaeb64be 100644 --- a/sdks/python/tests/conftest.py +++ b/sdks/python/tests/conftest.py @@ -40,6 +40,23 @@ def patch_streamer(): streamer.close(timeout=5) +@pytest.fixture +def patch_streamer_without_batching(): + try: + fake_message_processor_ = ( + backend_emulator_message_processor.BackendEmulatorMessageProcessor() + ) + streamer = streamer_constructors.construct_streamer( + message_processor=fake_message_processor_, + n_consumers=1, + use_batching=False, + ) + + yield streamer, fake_message_processor_ + finally: + streamer.close(timeout=5) + + @pytest.fixture def fake_backend(patch_streamer): """ @@ -68,3 +85,27 @@ def fake_backend(patch_streamer): mock_construct_online_streamer, ): yield fake_message_processor_ + + +@pytest.fixture +def fake_backend_without_batching(patch_streamer_without_batching): + """ + Same as fake_backend but must be used when batching is not supported + (e.g. when there are Span/Trace update requests) + """ + streamer, fake_message_processor_ = patch_streamer_without_batching + + fake_message_processor_ = cast( + backend_emulator_message_processor.BackendEmulatorMessageProcessor, + fake_message_processor_, + ) + + mock_construct_online_streamer = mock.Mock() + mock_construct_online_streamer.return_value = streamer + + with mock.patch.object( + streamer_constructors, + "construct_online_streamer", + mock_construct_online_streamer, + ): + yield fake_message_processor_ diff --git a/sdks/python/tests/library_integration/haystack/__init__.py b/sdks/python/tests/library_integration/haystack/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/tests/library_integration/haystack/requirements.text b/sdks/python/tests/library_integration/haystack/requirements.text new file mode 100644 index 0000000000..493199bc80 --- /dev/null +++ b/sdks/python/tests/library_integration/haystack/requirements.text @@ -0,0 +1 @@ +haystack-ai diff --git a/sdks/python/tests/library_integration/haystack/test_haystack.py b/sdks/python/tests/library_integration/haystack/test_haystack.py new file mode 100644 index 0000000000..4890b4c971 --- /dev/null +++ b/sdks/python/tests/library_integration/haystack/test_haystack.py @@ -0,0 +1,134 @@ +import sys + +import pytest + +from opik.config import OPIK_PROJECT_DEFAULT_NAME +from ...testlib import ( + ANY, + ANY_DICT, + ANY_STRING, + SpanModel, + TraceModel, + ANY_BUT_NONE, + assert_equal, + patch_environ, +) + + +@pytest.fixture(autouse=True, scope="module") +def enable_haystack_content_tracing(): + assert ( + "haystack" not in sys.modules + ), "haystack must be imported only after content tracing env var is set" + with patch_environ({"HAYSTACK_CONTENT_TRACING_ENABLED": "true"}): + yield + + +@pytest.mark.parametrize( + "project_name, expected_project_name", + [ + (None, OPIK_PROJECT_DEFAULT_NAME), + ("haystack-integration-test", "haystack-integration-test"), + ], +) +def test_haystack__happyflow( + fake_backend_without_batching, + project_name, + expected_project_name, +): + from haystack import Pipeline + from haystack.components.builders import ChatPromptBuilder + from haystack.components.generators.chat import OpenAIChatGenerator + from haystack.dataclasses import ChatMessage + from opik.integrations.haystack import ( + OpikConnector, + ) + from haystack.tracing import tracer + + opik_connector = OpikConnector("Chat example", project_name=project_name) + pipe = Pipeline() + pipe.add_component("tracer", opik_connector) + pipe.add_component("prompt_builder", ChatPromptBuilder()) + pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo")) + + pipe.connect("prompt_builder.prompt", "llm.messages") + + messages = [ + ChatMessage.from_system( + "Always respond in German even if some input data is in other languages." + ), + ChatMessage.from_user("Tell me about {{location}}"), + ] + + pipe.run( + data={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + } + ) + + tracer.actual_tracer.flush() + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="Chat example", + input={ + "prompt_builder": { + "template_variables": {"location": "Berlin"}, + "template": messages, + } + }, + output=ANY_DICT, + tags=ANY, + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name="tracer", + input=ANY_DICT, + output=ANY_DICT, + tags=ANY, + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + ), + SpanModel( + id=ANY_BUT_NONE, + name="prompt_builder", + input=ANY_DICT, + output=ANY_DICT, + tags=ANY, + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + ), + SpanModel( + id=ANY_BUT_NONE, + name="llm", + type="llm", + input=ANY_DICT, + output=ANY_DICT, + tags=ANY, + metadata=ANY_DICT, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + project_name=expected_project_name, + usage={ + "prompt_tokens": ANY_BUT_NONE, + "completion_tokens": ANY_BUT_NONE, + "total_tokens": ANY_BUT_NONE, + }, + model=ANY_STRING(startswith="gpt-3.5-turbo"), + ), + ], + ) + + assert len(fake_backend_without_batching.trace_trees) == 1 + assert_equal(EXPECTED_TRACE_TREE, fake_backend_without_batching.trace_trees[0]) diff --git a/sdks/python/tests/testlib/backend_emulator_message_processor.py b/sdks/python/tests/testlib/backend_emulator_message_processor.py index aa8c2c8b88..9240cd999e 100644 --- a/sdks/python/tests/testlib/backend_emulator_message_processor.py +++ b/sdks/python/tests/testlib/backend_emulator_message_processor.py @@ -2,8 +2,11 @@ from typing import List, Tuple, Type, Dict, Union, Optional from .models import TraceModel, SpanModel, FeedbackScoreModel - +from opik import dict_utils import collections +import logging + +LOGGER = logging.getLogger(__name__) class BackendEmulatorMessageProcessor(message_processors.BaseMessageProcessor): @@ -92,7 +95,7 @@ def span_trees(self): self._span_trees.sort(key=lambda x: x.start_time) return self._span_trees - def process(self, message: messages.BaseMessage) -> None: + def _dispatch_message(self, message: messages.BaseMessage) -> None: if isinstance(message, messages.CreateTraceMessage): trace = TraceModel( id=message.trace_id, @@ -143,13 +146,33 @@ def process(self, message: messages.BaseMessage) -> None: self.process(item) elif isinstance(message, messages.UpdateSpanMessage): span: SpanModel = self._observations[message.span_id] - span.output = message.output - span.usage = message.usage - span.end_time = message.end_time + update_payload = { + "output": message.output, + "usage": message.usage, + "provider": message.provider, + "model": message.model, + "end_time": message.end_time, + "metadata": message.metadata, + "error_info": message.error_info, + "tags": message.tags, + "input": message.input, + } + cleaned_update_payload = dict_utils.remove_none_from_dict(update_payload) + span.__dict__.update(cleaned_update_payload) + elif isinstance(message, messages.UpdateTraceMessage): current_trace: TraceModel = self._observations[message.trace_id] - current_trace.output = message.output - current_trace.end_time = message.end_time + update_payload = { + "output": message.output, + "end_time": message.end_time, + "metadata": message.metadata, + "error_info": message.error_info, + "tags": message.tags, + "input": message.input, + } + cleaned_update_payload = dict_utils.remove_none_from_dict(update_payload) + current_trace.__dict__.update(cleaned_update_payload) + elif isinstance(message, messages.AddSpanFeedbackScoresBatchMessage): for feedback_score_message in message.batch: feedback_model = FeedbackScoreModel( @@ -177,6 +200,16 @@ def process(self, message: messages.BaseMessage) -> None: self.processed_messages.append(message) + def process(self, message: messages.BaseMessage) -> None: + try: + self._dispatch_message(message) + except Exception as exception: + LOGGER.error( + "Unexpected exception in BackendEmulatorMessageProcessor.process", + exc_info=True, + ) + print(exception) + def get_messages_of_type(self, allowed_types: Tuple[Type, ...]): """ Returns all messages instances of requested types