Skip to content

Commit

Permalink
feat(media): utility to resolve media references with media content (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Dec 11, 2024
1 parent db0b4f1 commit 7e1ce87
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
rm -rf .env
echo "::group::Run server"
TELEMETRY_ENABLED=false LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
TELEMETRY_ENABLED=false LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT=http://localhost:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
echo "::endgroup::"
# Add this step to check the health of the container
Expand Down
14 changes: 14 additions & 0 deletions langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,20 @@ def fetch_observation(
handle_fern_exception(e)
raise e

def fetch_media(self, id: str):
"""Get media content by ID.
Args:
id: The identifier of the media content to fetch.
Returns:
Media object
Raises:
Exception: If the media content could not be found or if an error occurred during the request.
"""
return self.client.media.get(id)

def get_observation(
self,
id: str,
Expand Down
119 changes: 118 additions & 1 deletion langfuse/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import hashlib
import logging
import os
from typing import Optional, cast, Tuple
import re
import requests
from typing import Optional, cast, Tuple, Any, TypeVar, Literal

from langfuse.api import MediaContentType
from langfuse.types import ParsedMediaReference

T = TypeVar("T")


class LangfuseMedia:
"""A class for wrapping media objects for upload to Langfuse.
Expand Down Expand Up @@ -201,3 +205,116 @@ def _parse_base64_data_uri(
self._log.error("Error parsing base64 data URI", exc_info=e)

return None, None

@staticmethod
def resolve_media_references(
*,
obj: T,
langfuse_client: Any,
resolve_with: Literal["base64_data_uri"],
max_depth: int = 10,
) -> T:
"""Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings
in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using
the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference
string is left unchanged.
Args:
obj: The object to process. Can be a primitive value, array, or nested object.
If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
langfuse_client: Langfuse client instance used to fetch media content.
resolve_with: The representation of the media content to replace the media reference string with.
Currently only "base64_data_uri" is supported.
max_depth: Optional. Default is 10. The maximum depth to traverse the object.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible.
If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = {
"image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
"nested": {
"pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
}
}
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
# Result:
# {
# "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...",
# "nested": {
# "pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
# }
# }
"""

def traverse(obj: Any, depth: int) -> Any:
if depth > max_depth:
return obj

# Handle string with potential media references
if isinstance(obj, str):
regex = r"@@@langfuseMedia:.+?@@@"
reference_string_matches = re.findall(regex, obj)
if len(reference_string_matches) == 0:
return obj

result = obj
reference_string_to_media_content = {}

for reference_string in reference_string_matches:
try:
parsed_media_reference = LangfuseMedia.parse_reference_string(
reference_string
)
media_data = langfuse_client.fetch_media(
parsed_media_reference["media_id"]
)
media_content = requests.get(media_data.url)
if not media_content.ok:
raise Exception("Failed to fetch media content")

base64_media_content = base64.b64encode(
media_content.content
).decode()
base64_data_uri = f"data:{media_data.content_type};base64,{base64_media_content}"

reference_string_to_media_content[reference_string] = (
base64_data_uri
)
except Exception as e:
logging.warning(
f"Error fetching media content for reference string {reference_string}: {e}"
)
# Do not replace the reference string if there's an error
continue

for ref_str, media_content in reference_string_to_media_content.items():
result = result.replace(ref_str, media_content)

return result

# Handle arrays
if isinstance(obj, list):
return [traverse(item, depth + 1) for item in obj]

# Handle dictionaries
if isinstance(obj, dict):
return {key: traverse(value, depth + 1) for key, value in obj.items()}

# Handle objects:
if hasattr(obj, "__dict__"):
return {
key: traverse(value, depth + 1)
for key, value in obj.__dict__.items()
}

return obj

return traverse(obj, 0)
65 changes: 65 additions & 0 deletions tests/test_media.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import base64
import pytest
from langfuse.media import LangfuseMedia
from langfuse.client import Langfuse
from uuid import uuid4
import re


# Test data
SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00"
Expand Down Expand Up @@ -104,3 +108,64 @@ def test_nonexistent_file():
assert media._source is None
assert media._content_bytes is None
assert media._content_type is None


@pytest.mark.skip(reason="Docker networking issues. Enable once LFE-3159 is fixed.")
def test_replace_media_reference_string_in_object(tmp_path):
# Create test audio file
audio_file = "static/joke_prompt.wav"
with open(audio_file, "rb") as f:
mock_audio_bytes = f.read()

# Create Langfuse client and trace with media
langfuse = Langfuse()

mock_trace_name = f"test-trace-with-audio-{uuid4()}"
base64_audio = base64.b64encode(mock_audio_bytes).decode()

trace = langfuse.trace(
name=mock_trace_name,
metadata={
"context": {
"nested": LangfuseMedia(
base64_data_uri=f"data:audio/wav;base64,{base64_audio}"
)
}
},
)

langfuse.flush()

# Verify media reference string format
fetched_trace = langfuse.fetch_trace(trace.id).data
media_ref = fetched_trace.metadata["context"]["nested"]
assert re.match(
r"^@@@langfuseMedia:type=audio/wav\|id=.+\|source=base64_data_uri@@@$",
media_ref,
)

# Resolve media references back to base64
resolved_trace = LangfuseMedia.resolve_media_references(
obj=fetched_trace, langfuse_client=langfuse, resolve_with="base64_data_uri"
)

# Verify resolved base64 matches original
expected_base64 = f"data:audio/wav;base64,{base64_audio}"
assert resolved_trace["metadata"]["context"]["nested"] == expected_base64

# Create second trace reusing the media reference
trace2 = langfuse.trace(
name=f"2-{mock_trace_name}",
metadata={
"context": {"nested": resolved_trace["metadata"]["context"]["nested"]}
},
)

langfuse.flush()

# Verify second trace has same media reference
fetched_trace2 = langfuse.fetch_trace(trace2.id).data
assert (
fetched_trace2.metadata["context"]["nested"]
== fetched_trace.metadata["context"]["nested"]
)

0 comments on commit 7e1ce87

Please sign in to comment.