From f2a08240692a3348656de7931ac7c155d40733fc Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:59:17 +0100 Subject: [PATCH] perf(ingestion): make max event and batch size configurable (#1103) --- langfuse/_task_manager/ingestion_consumer.py | 6 +++--- tests/test_langchain.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/langfuse/_task_manager/ingestion_consumer.py b/langfuse/_task_manager/ingestion_consumer.py index a29ba0c2..9900654c 100644 --- a/langfuse/_task_manager/ingestion_consumer.py +++ b/langfuse/_task_manager/ingestion_consumer.py @@ -1,8 +1,8 @@ import json import logging +import os import threading import time - from queue import Empty, Queue from typing import Any, List, Optional @@ -21,8 +21,8 @@ from .media_manager import MediaManager -MAX_EVENT_SIZE_BYTES = 1_000_000 -MAX_BATCH_SIZE_BYTES = 2_500_000 +MAX_EVENT_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000)) +MAX_BATCH_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000)) class IngestionMetadata(pydantic.BaseModel): diff --git a/tests/test_langchain.py b/tests/test_langchain.py index c8153537..86e49b97 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -2184,7 +2184,9 @@ def _generate_random_dict(n: int, key_length: int = 8) -> Dict[str, Any]: overhead = duration_with_langfuse - duration_without_langfuse print(f"Langfuse overhead: {overhead}ms") - assert overhead < 50, f"Langfuse tracing overhead of {overhead}ms exceeds threshold" + assert ( + overhead < 100 + ), f"Langfuse tracing overhead of {overhead}ms exceeds threshold" handler.flush()