diff --git a/examples/aop_examples/discovery/example_agent_communication.py b/examples/aop_examples/discovery/example_agent_communication.py index c4fb28ec7..6122920e2 100644 --- a/examples/aop_examples/discovery/example_agent_communication.py +++ b/examples/aop_examples/discovery/example_agent_communication.py @@ -11,15 +11,6 @@ def simulate_agent_discovery(): """Simulate how an agent would use the discovery tool.""" - # Create a sample agent that will use the discovery tool - Agent( - agent_name="ProjectCoordinator", - agent_description="Coordinates projects and assigns tasks to other agents", - system_prompt="You are a project coordinator who helps organize work and delegate tasks to the most appropriate team members. You can discover information about other agents to make better decisions.", - model_name="gpt-4o-mini", - temperature=0.4, - ) - # Create the AOP cluster aop = AOP( server_name="Project Team", diff --git a/examples/aop_examples/example_new_agent_tools.py b/examples/aop_examples/example_new_agent_tools.py index 4806fa8e5..4e46157de 100644 --- a/examples/aop_examples/example_new_agent_tools.py +++ b/examples/aop_examples/example_new_agent_tools.py @@ -7,19 +7,12 @@ import json import asyncio -from swarms.structs.aop import AOPCluster from swarms.tools.mcp_client_tools import execute_tool_call_simple async def demonstrate_new_agent_tools(): """Demonstrate the new agent information tools.""" - # Create AOP cluster connection - AOPCluster( - urls=["http://localhost:5932/mcp"], - transport="streamable-http", - ) - print("๐Ÿ”ง New AOP Agent Information Tools Demo") print("=" * 50) print() @@ -77,7 +70,6 @@ async def demonstrate_new_agent_tools(): if isinstance(result, list) and len(result) > 0: data = result[0] if data.get("success"): - data.get("agent_info", {}) discovery_info = data.get("discovery_info", {}) print( f" Agent: {discovery_info.get('agent_name', 'Unknown')}" diff --git a/examples/demos/legal/legal_swarm.py b/examples/demos/legal/legal_swarm.py index 6eefd7165..6b8386bae 100644 --- a/examples/demos/legal/legal_swarm.py +++ b/examples/demos/legal/legal_swarm.py @@ -303,7 +303,6 @@ def create_pdf_from_string( """ try: from reportlab.lib.pagesizes import letter - from reportlab.pdfgen import canvas from reportlab.lib.styles import ( getSampleStyleSheet, ParagraphStyle, diff --git a/examples/guides/graphworkflow_guide/quick_start_guide.py b/examples/guides/graphworkflow_guide/quick_start_guide.py index 32fd274af..8888da614 100644 --- a/examples/guides/graphworkflow_guide/quick_start_guide.py +++ b/examples/guides/graphworkflow_guide/quick_start_guide.py @@ -337,7 +337,7 @@ def step_4_advanced_patterns(): print("\n๐Ÿ“Š Workflow structure:") try: advanced_workflow.visualize_simple() - except: + except Exception: print(" (Text visualization not available)") # Execute advanced workflow diff --git a/examples/guides/graphworkflow_guide/setup_and_test.py b/examples/guides/graphworkflow_guide/setup_and_test.py index e8641d798..deaf51d7e 100644 --- a/examples/guides/graphworkflow_guide/setup_and_test.py +++ b/examples/guides/graphworkflow_guide/setup_and_test.py @@ -109,7 +109,7 @@ def test_basic_import() -> bool: print("\n๐Ÿงช Testing basic GraphWorkflow import...") try: - from swarms.structs.graph_workflow import GraphWorkflow + from swarms.structs.graph_workflow import GraphWorkflow # noqa: F401 print("โœ… GraphWorkflow imported successfully") return True @@ -123,7 +123,7 @@ def test_agent_import() -> bool: print("\n๐Ÿงช Testing Agent import...") try: - from swarms import Agent + from swarms import Agent # noqa: F401 print("โœ… Agent imported successfully") return True diff --git a/examples/multi_agent/graphworkflow_examples/test_enhanced_json_export.py b/examples/multi_agent/graphworkflow_examples/test_enhanced_json_export.py index 45a8c72fa..52149b98f 100644 --- a/examples/multi_agent/graphworkflow_examples/test_enhanced_json_export.py +++ b/examples/multi_agent/graphworkflow_examples/test_enhanced_json_export.py @@ -213,7 +213,7 @@ def test_file_save_load(): try: os.remove("test_workflow.json") print("\n๐Ÿงน Cleaned up test file") - except: + except Exception: pass diff --git a/examples/multi_agent/simulations/agent_map/agent_map_simulation.py b/examples/multi_agent/simulations/agent_map/agent_map_simulation.py index 9053f3d2b..4befb47c4 100644 --- a/examples/multi_agent/simulations/agent_map/agent_map_simulation.py +++ b/examples/multi_agent/simulations/agent_map/agent_map_simulation.py @@ -1202,7 +1202,7 @@ def animate(frame): self.fig.canvas.manager.window.wm_attributes( "-topmost", 0 ) - except: + except Exception: pass # Not all backends support this plt.show(block=False) @@ -1535,7 +1535,7 @@ def run( if with_visualization and self.fig is not None: try: self.update_visualization() - except: + except Exception: pass # Ignore visualization errors # Print status every 10 seconds diff --git a/examples/multi_agent/simulations/agent_map/v0/demo_simulation.py b/examples/multi_agent/simulations/agent_map/v0/demo_simulation.py index 2a39ec766..da71ceca8 100644 --- a/examples/multi_agent/simulations/agent_map/v0/demo_simulation.py +++ b/examples/multi_agent/simulations/agent_map/v0/demo_simulation.py @@ -250,7 +250,7 @@ def main(): if simulation.fig is not None: try: simulation.update_visualization() - except: + except Exception: pass # Ignore visualization errors # Check if we have enough conversations to make it interesting diff --git a/examples/multi_agent/simulations/map_generation/game_map.py b/examples/multi_agent/simulations/map_generation/game_map.py index aaff56d43..2d5e17b1c 100644 --- a/examples/multi_agent/simulations/map_generation/game_map.py +++ b/examples/multi_agent/simulations/map_generation/game_map.py @@ -25,7 +25,6 @@ # Third-party model imports try: - import timm from segment_anything import ( SamAutomaticMaskGenerator, sam_model_registry, diff --git a/examples/utils/agent_loader/multi_agents_loader_demo.py b/examples/utils/agent_loader/multi_agents_loader_demo.py index 10a6e6b67..b0ff1fa8c 100644 --- a/examples/utils/agent_loader/multi_agents_loader_demo.py +++ b/examples/utils/agent_loader/multi_agents_loader_demo.py @@ -1,4 +1,5 @@ from swarms.utils import load_agents_from_markdown +from swarms.structs.sequential_workflow import SequentialWorkflow agents = load_agents_from_markdown( [ @@ -9,7 +10,6 @@ ) # Example 3: Use agents in a workflow -from swarms.structs.sequential_workflow import SequentialWorkflow workflow = SequentialWorkflow(agents=agents, max_loops=1) diff --git a/scripts/docker/test_docker.py b/scripts/docker/test_docker.py index a50a17b9e..b4414b33f 100644 --- a/scripts/docker/test_docker.py +++ b/scripts/docker/test_docker.py @@ -22,7 +22,7 @@ def test_swarms_import() -> Dict[str, Any]: ) # Test basic functionality - from swarms import Agent + from swarms import Agent # noqa: F401 print(" Agent class imported successfully") diff --git a/swarms/structs/multi_model_gpu_manager.py b/swarms/structs/multi_model_gpu_manager.py index 8a945e821..a13f1ba34 100644 --- a/swarms/structs/multi_model_gpu_manager.py +++ b/swarms/structs/multi_model_gpu_manager.py @@ -34,7 +34,7 @@ # Try to import transformers, but don't fail if not available try: import transformers - from transformers import AutoModel, AutoTokenizer + from transformers import AutoModel TRANSFORMERS_AVAILABLE = True except ImportError: diff --git a/swarms/tools/py_func_to_openai_func_str.py b/swarms/tools/py_func_to_openai_func_str.py index a1232ed0c..a87c33207 100644 --- a/swarms/tools/py_func_to_openai_func_str.py +++ b/swarms/tools/py_func_to_openai_func_str.py @@ -16,7 +16,6 @@ Type, TypeVar, Union, - get_args, ) from pydantic import BaseModel, Field diff --git a/swarms/utils/audio_processing.py b/swarms/utils/audio_processing.py index 1f746923f..b0fe6a990 100644 --- a/swarms/utils/audio_processing.py +++ b/swarms/utils/audio_processing.py @@ -117,7 +117,6 @@ def process_audio_with_model( from litellm import ( completion, supports_audio_input, - supports_audio_output, ) if not supports_audio_input(model): diff --git a/tests/agent/agents/test_agent_logging.py b/tests/agent/agents/test_agent_logging.py index 1439935ea..bd7cff6af 100644 --- a/tests/agent/agents/test_agent_logging.py +++ b/tests/agent/agents/test_agent_logging.py @@ -1,11 +1,10 @@ from unittest.mock import MagicMock import unittest from swarms.structs.agent import Agent -from swarms.tools.tool_parse_exec import parse_and_execute_json # Mock parse_and_execute_json for testing -parse_and_execute_json = MagicMock() -parse_and_execute_json.return_value = { +mock_parse_and_execute_json = MagicMock() +mock_parse_and_execute_json.return_value = { "tool_name": "calculator", "args": {"numbers": [2, 2]}, "output": "4", diff --git a/tests/agent/benchmark_agent/test_auto_test_eval.py b/tests/agent/benchmark_agent/test_auto_test_eval.py index 0b4d799af..22a9e8d42 100644 --- a/tests/agent/benchmark_agent/test_auto_test_eval.py +++ b/tests/agent/benchmark_agent/test_auto_test_eval.py @@ -92,7 +92,7 @@ def _get_swarms_version(self) -> str: import swarms return swarms.__version__ - except: + except Exception: return "Unknown" def _get_system_info(self) -> SwarmSystemInfo: @@ -185,7 +185,7 @@ def _get_dependencies_info(self) -> str: for dist in pkg_resources.working_set: deps.append(f"- {dist.key} {dist.version}") return "\n".join(deps) - except: + except Exception: return "Unable to fetch dependency information" # First, add this method to your SwarmsIssueReporter class diff --git a/tests/aop/aop_benchmark.py b/tests/aop/aop_benchmark.py index 86cc9133f..e2d1d3e82 100644 --- a/tests/aop/aop_benchmark.py +++ b/tests/aop/aop_benchmark.py @@ -1,13 +1,47 @@ +#!/usr/bin/env python3 +""" +AOP Framework Benchmarking Suite + +This comprehensive benchmarking suite tests the scaling laws of the AOP (Agent Orchestration Platform) +framework by measuring latency, throughput, memory usage, and other performance metrics across different +agent counts and configurations. + +Features: +- Scaling law analysis (1 to 100+ agents) +- Latency and throughput measurements +- Memory usage profiling +- Concurrent execution testing +- Error rate analysis +- Performance visualization with charts +- Statistical analysis and reporting +- Real agent testing with actual LLM calls + +Usage: +1. Set your OpenAI API key: export OPENAI_API_KEY="your-key-here" +2. Install required dependencies: pip install swarms +3. Run the benchmark: python aop_benchmark.py +4. Check results in the generated charts and reports + +Configuration: +- Edit BENCHMARK_CONFIG at the top of the file to customize settings +- Adjust model_name, max_agents, and other parameters as needed +- This benchmark ONLY uses real agents with actual LLM calls + +Author: AI Assistant +Date: 2024 +""" + import gc import json import os +import psutil import random import statistics import time -import uuid import warnings +import uuid from concurrent.futures import ThreadPoolExecutor, as_completed -from dataclasses import asdict, dataclass +from dataclasses import dataclass, asdict from datetime import datetime, timedelta from typing import Any, Dict, List, Tuple @@ -15,22 +49,14 @@ import numpy as np import openpyxl import pandas as pd -import psutil import seaborn as sns from dotenv import load_dotenv from loguru import logger from openpyxl.styles import Font from openpyxl.utils.dataframe import dataframe_to_rows -from swarms.structs.agent import Agent from swarms.structs.aop import AOP -from swarms.utils.litellm_wrapper import LiteLLM - -# Suppress warnings for cleaner output -warnings.filterwarnings("ignore") -# Load environment variables -load_dotenv() # Configuration BENCHMARK_CONFIG = { "models": [ @@ -60,6 +86,21 @@ "detailed_logging": True, # Enable detailed logging } +# Suppress warnings for cleaner output +warnings.filterwarnings("ignore") + +# Load environment variables +load_dotenv() + +# Import swarms Agent directly to avoid uvloop dependency +try: + from swarms.structs.agent import Agent + from swarms.utils.litellm_wrapper import LiteLLM + + SWARMS_AVAILABLE = True +except ImportError: + SWARMS_AVAILABLE = False + @dataclass class BenchmarkResult: @@ -375,6 +416,12 @@ def create_real_agent( "SWARMS_API_KEY or OPENAI_API_KEY environment variable is required for real agent testing" ) + # Check if swarms is available + if not SWARMS_AVAILABLE: + raise ImportError( + "Swarms not available - install swarms: pip install swarms" + ) + # Create LiteLLM instance for the specific model llm = LiteLLM( model_name=model_name, @@ -877,7 +924,7 @@ def _create_summary_sheet( try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) - except: + except Exception: pass adjusted_width = min(max_length + 2, 50) ws.column_dimensions[column_letter].width = adjusted_width @@ -1616,7 +1663,6 @@ def run_resource_management_test( initial_memory = ( psutil.Process().memory_info().rss / 1024 / 1024 ) - psutil.cpu_percent() # Execute some tasks available_agents = aop.list_agents() @@ -2439,13 +2485,13 @@ def _create_tool_performance_chart( ax2.grid(True, alpha=0.3) # Add value labels on bars - for i, (bar, time) in enumerate( + for i, (bar, exec_time) in enumerate( zip(bars2, df["avg_tool_execution_time"]) ): ax2.text( bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, - f"{time:.2f}s", + f"{exec_time:.2f}s", ha="center", va="bottom", fontsize=8, @@ -2905,6 +2951,33 @@ def main(): print(f" Context Length: {BENCHMARK_CONFIG['context_length']}") print() + # Check for required environment variables + api_key = os.getenv("SWARMS_API_KEY") or os.getenv( + "OPENAI_API_KEY" + ) + if not api_key: + print( + "โŒ Error: SWARMS_API_KEY or OPENAI_API_KEY not found in environment variables" + ) + print( + " This benchmark requires real LLM calls for accurate performance testing" + ) + print( + " Set your API key: export SWARMS_API_KEY='your-key-here' or export OPENAI_API_KEY='your-key-here'" + ) + return 1 + + # Check for required imports + if not SWARMS_AVAILABLE: + print("โŒ Error: swarms not available") + print( + " Install required dependencies: pip install swarms openpyxl" + ) + print( + " This benchmark requires swarms framework and Excel support" + ) + return 1 + # Initialize benchmark suite benchmark = AOPBenchmarkSuite( output_dir="aop_benchmark_results", diff --git a/tests/structs/test_conversation.py b/tests/structs/test_conversation.py index 283fb1cbf..c86462480 100644 --- a/tests/structs/test_conversation.py +++ b/tests/structs/test_conversation.py @@ -1,699 +1,567 @@ -import shutil -from datetime import datetime -from pathlib import Path +import os +import json +import time +import datetime +import yaml +from swarms.structs.conversation import ( + Conversation, + generate_conversation_id, +) -from loguru import logger - -from swarms.structs.conversation import Conversation +def run_all_tests(): + """Run all tests for the Conversation class""" + test_results = [] -def setup_temp_conversations_dir(): - """Create a temporary directory for conversation cache files.""" - temp_dir = Path("temp_test_conversations") - if temp_dir.exists(): - shutil.rmtree(temp_dir) - temp_dir.mkdir() - logger.info(f"Created temporary test directory: {temp_dir}") - return temp_dir + def run_test(test_func): + try: + test_func() + test_results.append(f"โœ… {test_func.__name__} passed") + except Exception as e: + test_results.append( + f"โŒ {test_func.__name__} failed: {str(e)}" + ) + def test_basic_initialization(): + """Test basic initialization of Conversation""" + conv = Conversation() + assert conv.id is not None + assert conv.conversation_history is not None + assert isinstance(conv.conversation_history, list) -def create_test_conversation(temp_dir): - """Create a basic conversation for testing.""" - conv = Conversation( - name="test_conversation", conversations_dir=str(temp_dir) - ) - conv.add("user", "Hello, world!") - conv.add("assistant", "Hello, user!") - logger.info("Created test conversation with basic messages") - return conv + # Test with custom ID + custom_id = generate_conversation_id() + conv_with_id = Conversation(id=custom_id) + assert conv_with_id.id == custom_id + # Test with custom name + conv_with_name = Conversation(name="Test Conversation") + assert conv_with_name.name == "Test Conversation" -def test_add_message(): - logger.info("Running test_add_message") - conv = Conversation() - conv.add("user", "Hello, world!") - try: - assert len(conv.conversation_history) == 1 - assert conv.conversation_history[0]["role"] == "user" - assert ( - conv.conversation_history[0]["content"] == "Hello, world!" + def test_initialization_with_settings(): + """Test initialization with various settings""" + conv = Conversation( + system_prompt="Test system prompt", + time_enabled=True, + autosave=True, + token_count=True, + provider="in-memory", + context_length=4096, + rules="Test rules", + custom_rules_prompt="Custom rules", + user="TestUser:", + save_as_yaml=True, + save_as_json_bool=True, ) - logger.success("test_add_message passed") - return True - except AssertionError as e: - logger.error(f"test_add_message failed: {str(e)}") - return False - - -def test_add_message_with_time(): - logger.info("Running test_add_message_with_time") - conv = Conversation(time_enabled=False) - conv.add("user", "Hello, world!") - try: - assert len(conv.conversation_history) == 1 - assert conv.conversation_history[0]["role"] == "user" - assert ( - conv.conversation_history[0]["content"] == "Hello, world!" + + # Test all settings + assert conv.system_prompt == "Test system prompt" + assert conv.time_enabled is True + assert conv.autosave is True + assert conv.token_count is True + assert conv.provider == "in-memory" + assert conv.context_length == 4096 + assert conv.rules == "Test rules" + assert conv.custom_rules_prompt == "Custom rules" + assert conv.user == "TestUser:" + assert conv.save_as_yaml is True + assert conv.save_as_json_bool is True + + def test_message_manipulation(): + """Test adding, deleting, and updating messages""" + conv = Conversation() + + # Test adding messages with different content types + conv.add("user", "Hello") # String content + conv.add("assistant", {"response": "Hi"}) # Dict content + conv.add("system", ["Hello", "Hi"]) # List content + + assert len(conv.conversation_history) == 3 + assert isinstance( + conv.conversation_history[1]["content"], dict ) - assert "timestamp" in conv.conversation_history[0] - logger.success("test_add_message_with_time passed") - return True - except AssertionError as e: - logger.error(f"test_add_message_with_time failed: {str(e)}") - return False - - -def test_delete_message(): - logger.info("Running test_delete_message") - conv = Conversation() - conv.add("user", "Hello, world!") - conv.delete(0) - try: - assert len(conv.conversation_history) == 0 - logger.success("test_delete_message passed") - return True - except AssertionError as e: - logger.error(f"test_delete_message failed: {str(e)}") - return False - - -def test_delete_message_out_of_bounds(): - logger.info("Running test_delete_message_out_of_bounds") - conv = Conversation() - conv.add("user", "Hello, world!") - try: - conv.delete(1) - logger.error( - "test_delete_message_out_of_bounds failed: Expected IndexError" + assert isinstance( + conv.conversation_history[2]["content"], list ) - return False - except IndexError: - logger.success("test_delete_message_out_of_bounds passed") - return True - - -def test_update_message(): - logger.info("Running test_update_message") - conv = Conversation() - conv.add("user", "Hello, world!") - conv.update(0, "assistant", "Hello, user!") - try: - assert len(conv.conversation_history) == 1 - assert conv.conversation_history[0]["role"] == "assistant" - assert ( - conv.conversation_history[0]["content"] == "Hello, user!" + + # Test adding multiple messages + conv.add_multiple( + ["user", "assistant", "system"], + ["Hi", "Hello there", "System message"], ) - logger.success("test_update_message passed") - return True - except AssertionError as e: - logger.error(f"test_update_message failed: {str(e)}") - return False - - -def test_update_message_out_of_bounds(): - logger.info("Running test_update_message_out_of_bounds") - conv = Conversation() - conv.add("user", "Hello, world!") - try: - conv.update(1, "assistant", "Hello, user!") - logger.error( - "test_update_message_out_of_bounds failed: Expected IndexError" + assert len(conv.conversation_history) == 6 + + # Test updating message with different content type + conv.update(0, "user", {"updated": "content"}) + assert isinstance( + conv.conversation_history[0]["content"], dict ) - return False - except IndexError: - logger.success("test_update_message_out_of_bounds passed") - return True - - -def test_return_history_as_string(): - logger.info("Running test_return_history_as_string") - conv = Conversation() - conv.add("user", "Hello, world!") - conv.add("assistant", "Hello, user!") - result = conv.return_history_as_string() - expected = "user: Hello, world!\n\nassistant: Hello, user!\n\n" - try: - assert result == expected - logger.success("test_return_history_as_string passed") - return True - except AssertionError as e: - logger.error( - f"test_return_history_as_string failed: {str(e)}" + + # Test deleting multiple messages + conv.delete(0) + conv.delete(0) + assert len(conv.conversation_history) == 4 + + def test_message_retrieval(): + """Test message retrieval methods""" + conv = Conversation() + + # Add messages in specific order for testing + conv.add("user", "Test message") + conv.add("assistant", "Test response") + conv.add("system", "System message") + + # Test query - note: messages might have system prompt prepended + message = conv.query(0) + assert "Test message" in message["content"] + + # Test search with multiple results + results = conv.search("Test") + assert ( + len(results) >= 2 + ) # At least two messages should contain "Test" + assert any( + "Test message" in str(msg["content"]) for msg in results ) - return False - - -def test_search(): - logger.info("Running test_search") - conv = Conversation() - conv.add("user", "Hello, world!") - conv.add("assistant", "Hello, user!") - results = conv.search("Hello") - try: - assert len(results) == 2 - assert results[0]["content"] == "Hello, world!" - assert results[1]["content"] == "Hello, user!" - logger.success("test_search passed") - return True - except AssertionError as e: - logger.error(f"test_search failed: {str(e)}") - return False - - -def test_conversation_cache_creation(): - logger.info("Running test_conversation_cache_creation") - temp_dir = setup_temp_conversations_dir() - try: - conv = Conversation( - name="cache_test", conversations_dir=str(temp_dir) + assert any( + "Test response" in str(msg["content"]) for msg in results ) - conv.add("user", "Test message") - cache_file = temp_dir / "cache_test.json" - result = cache_file.exists() - if result: - logger.success("test_conversation_cache_creation passed") - else: - logger.error( - "test_conversation_cache_creation failed: Cache file not created" - ) - return result - finally: - shutil.rmtree(temp_dir) + # Test get_last_message_as_string + last_message = conv.get_last_message_as_string() + assert "System message" in last_message + + # Test return_messages_as_list + messages_list = conv.return_messages_as_list() + assert ( + len(messages_list) >= 3 + ) # At least our 3 added messages + assert any("Test message" in msg for msg in messages_list) -def test_conversation_cache_loading(): - logger.info("Running test_conversation_cache_loading") - temp_dir = setup_temp_conversations_dir() - try: - conv1 = Conversation( - name="load_test", conversations_dir=str(temp_dir) + # Test return_messages_as_dictionary + messages_dict = conv.return_messages_as_dictionary() + assert ( + len(messages_dict) >= 3 + ) # At least our 3 added messages + assert all(isinstance(m, dict) for m in messages_dict) + assert all( + {"role", "content"} <= set(m.keys()) + for m in messages_dict ) - conv1.add("user", "Test message") - conv2 = Conversation.load_conversation( - name="load_test", conversations_dir=str(temp_dir) + # Test get_final_message and content + assert "System message" in conv.get_final_message() + assert "System message" in conv.get_final_message_content() + + # Test return_all_except_first + remaining_messages = conv.return_all_except_first() + assert ( + len(remaining_messages) >= 2 + ) # At least 2 messages after removing first + + # Test return_all_except_first_string + remaining_string = conv.return_all_except_first_string() + assert isinstance(remaining_string, str) + + def test_saving_loading(): + """Test saving and loading conversation""" + # Test with save_enabled + conv = Conversation( + save_enabled=True, + conversations_dir="./test_conversations", ) - result = ( - len(conv2.conversation_history) == 1 - and conv2.conversation_history[0]["content"] - == "Test message" + conv.add("user", "Test save message") + + # Test save_as_json + test_file = os.path.join( + "./test_conversations", "test_conversation.json" ) - if result: - logger.success("test_conversation_cache_loading passed") - else: - logger.error( - "test_conversation_cache_loading failed: Loaded conversation mismatch" - ) - return result - finally: - shutil.rmtree(temp_dir) - - -def test_add_multiple_messages(): - logger.info("Running test_add_multiple_messages") - conv = Conversation() - roles = ["user", "assistant", "system"] - contents = ["Hello", "Hi there", "System message"] - conv.add_multiple_messages(roles, contents) - try: - assert len(conv.conversation_history) == 3 - assert conv.conversation_history[0]["role"] == "user" - assert conv.conversation_history[1]["role"] == "assistant" - assert conv.conversation_history[2]["role"] == "system" - logger.success("test_add_multiple_messages passed") - return True - except AssertionError as e: - logger.error(f"test_add_multiple_messages failed: {str(e)}") - return False - - -def test_query(): - logger.info("Running test_query") - conv = Conversation() - conv.add("user", "Test message") - try: - result = conv.query(0) - assert result["role"] == "user" - assert result["content"] == "Test message" - logger.success("test_query passed") - return True - except AssertionError as e: - logger.error(f"test_query failed: {str(e)}") - return False - - -def test_display_conversation(): - logger.info("Running test_display_conversation") - conv = Conversation() - conv.add("user", "Hello") - conv.add("assistant", "Hi") - try: - conv.display_conversation() - logger.success("test_display_conversation passed") - return True - except Exception as e: - logger.error(f"test_display_conversation failed: {str(e)}") - return False - - -def test_count_messages_by_role(): - logger.info("Running test_count_messages_by_role") - conv = Conversation() - conv.add("user", "Hello") - conv.add("assistant", "Hi") - conv.add("system", "System message") - try: - counts = conv.count_messages_by_role() - assert counts["user"] == 1 - assert counts["assistant"] == 1 - assert counts["system"] == 1 - logger.success("test_count_messages_by_role passed") - return True - except AssertionError as e: - logger.error(f"test_count_messages_by_role failed: {str(e)}") - return False - - -def test_get_str(): - logger.info("Running test_get_str") - conv = Conversation() - conv.add("user", "Hello") - try: - result = conv.get_str() - assert "user: Hello" in result - logger.success("test_get_str passed") - return True - except AssertionError as e: - logger.error(f"test_get_str failed: {str(e)}") - return False - - -def test_to_json(): - logger.info("Running test_to_json") - conv = Conversation() - conv.add("user", "Hello") - try: - result = conv.to_json() - assert isinstance(result, str) - assert "Hello" in result - logger.success("test_to_json passed") - return True - except AssertionError as e: - logger.error(f"test_to_json failed: {str(e)}") - return False - - -def test_to_dict(): - logger.info("Running test_to_dict") - conv = Conversation() - conv.add("user", "Hello") - try: - result = conv.to_dict() - assert isinstance(result, list) - assert result[0]["content"] == "Hello" - logger.success("test_to_dict passed") - return True - except AssertionError as e: - logger.error(f"test_to_dict failed: {str(e)}") - return False - - -def test_to_yaml(): - logger.info("Running test_to_yaml") - conv = Conversation() - conv.add("user", "Hello") - try: - result = conv.to_yaml() - assert isinstance(result, str) - assert "Hello" in result - logger.success("test_to_yaml passed") - return True - except AssertionError as e: - logger.error(f"test_to_yaml failed: {str(e)}") - return False - - -def test_get_last_message_as_string(): - logger.info("Running test_get_last_message_as_string") - conv = Conversation() - conv.add("user", "First") - conv.add("assistant", "Last") - try: - result = conv.get_last_message_as_string() - assert result == "assistant: Last" - logger.success("test_get_last_message_as_string passed") - return True - except AssertionError as e: - logger.error( - f"test_get_last_message_as_string failed: {str(e)}" + conv.save_as_json(test_file) + assert os.path.exists(test_file) + + # Test load_from_json + new_conv = Conversation() + new_conv.load_from_json(test_file) + assert len(new_conv.conversation_history) == 1 + assert ( + new_conv.conversation_history[0]["content"] + == "Test save message" ) - return False - - -def test_return_messages_as_list(): - logger.info("Running test_return_messages_as_list") - conv = Conversation() - conv.add("user", "Hello") - conv.add("assistant", "Hi") - try: - result = conv.return_messages_as_list() - assert len(result) == 2 - assert result[0] == "user: Hello" - assert result[1] == "assistant: Hi" - logger.success("test_return_messages_as_list passed") - return True - except AssertionError as e: - logger.error(f"test_return_messages_as_list failed: {str(e)}") - return False - - -def test_return_messages_as_dictionary(): - logger.info("Running test_return_messages_as_dictionary") - conv = Conversation() - conv.add("user", "Hello") - try: - result = conv.return_messages_as_dictionary() - assert len(result) == 1 - assert result[0]["role"] == "user" - assert result[0]["content"] == "Hello" - logger.success("test_return_messages_as_dictionary passed") - return True - except AssertionError as e: - logger.error( - f"test_return_messages_as_dictionary failed: {str(e)}" + + # Test class method load_conversation + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir="./test_conversations" ) - return False + assert loaded_conv.id == conv.id + # Cleanup + os.remove(test_file) + os.rmdir("./test_conversations") -def test_add_tool_output_to_agent(): - logger.info("Running test_add_tool_output_to_agent") - conv = Conversation() - tool_output = {"name": "test_tool", "output": "test result"} - try: - conv.add_tool_output_to_agent("tool", tool_output) - assert len(conv.conversation_history) == 1 - assert conv.conversation_history[0]["role"] == "tool" - assert conv.conversation_history[0]["content"] == tool_output - logger.success("test_add_tool_output_to_agent passed") - return True - except AssertionError as e: - logger.error( - f"test_add_tool_output_to_agent failed: {str(e)}" - ) - return False - - -def test_get_final_message(): - logger.info("Running test_get_final_message") - conv = Conversation() - conv.add("user", "First") - conv.add("assistant", "Last") - try: - result = conv.get_final_message() - assert result == "assistant: Last" - logger.success("test_get_final_message passed") - return True - except AssertionError as e: - logger.error(f"test_get_final_message failed: {str(e)}") - return False - - -def test_get_final_message_content(): - logger.info("Running test_get_final_message_content") - conv = Conversation() - conv.add("user", "First") - conv.add("assistant", "Last") - try: - result = conv.get_final_message_content() - assert result == "Last" - logger.success("test_get_final_message_content passed") - return True - except AssertionError as e: - logger.error( - f"test_get_final_message_content failed: {str(e)}" - ) - return False - - -def test_return_all_except_first(): - logger.info("Running test_return_all_except_first") - conv = Conversation() - conv.add("system", "System") - conv.add("user", "Hello") - conv.add("assistant", "Hi") - try: - result = conv.return_all_except_first() - assert len(result) == 2 - assert result[0]["role"] == "user" - assert result[1]["role"] == "assistant" - logger.success("test_return_all_except_first passed") - return True - except AssertionError as e: - logger.error(f"test_return_all_except_first failed: {str(e)}") - return False - - -def test_return_all_except_first_string(): - logger.info("Running test_return_all_except_first_string") - conv = Conversation() - conv.add("system", "System") - conv.add("user", "Hello") - conv.add("assistant", "Hi") - try: - result = conv.return_all_except_first_string() - assert "Hello" in result - assert "Hi" in result - assert "System" not in result - logger.success("test_return_all_except_first_string passed") - return True - except AssertionError as e: - logger.error( - f"test_return_all_except_first_string failed: {str(e)}" - ) - return False + def test_output_formats(): + """Test different output formats""" + conv = Conversation() + conv.add("user", "Test message") + conv.add("assistant", {"response": "Test"}) + + # Test JSON output + json_output = conv.to_json() + assert isinstance(json_output, str) + parsed_json = json.loads(json_output) + assert len(parsed_json) == 2 + + # Test dict output + dict_output = conv.to_dict() + assert isinstance(dict_output, list) + assert len(dict_output) == 2 + + # Test YAML output + yaml_output = conv.to_yaml() + assert isinstance(yaml_output, str) + parsed_yaml = yaml.safe_load(yaml_output) + assert len(parsed_yaml) == 2 + + # Test return_json + json_str = conv.return_json() + assert isinstance(json_str, str) + assert len(json.loads(json_str)) == 2 + + def test_memory_management(): + """Test memory management functions""" + conv = Conversation() + # Test clear + conv.add("user", "Test message") + conv.clear() + assert len(conv.conversation_history) == 0 -def test_batch_add(): - logger.info("Running test_batch_add") - conv = Conversation() - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - ] - try: + # Test clear_memory + conv.add("user", "Test message") + conv.clear_memory() + assert len(conv.conversation_history) == 0 + + # Test batch operations + messages = [ + {"role": "user", "content": "Message 1"}, + {"role": "assistant", "content": "Response 1"}, + ] conv.batch_add(messages) assert len(conv.conversation_history) == 2 - assert conv.conversation_history[0]["role"] == "user" - assert conv.conversation_history[1]["role"] == "assistant" - logger.success("test_batch_add passed") - return True - except AssertionError as e: - logger.error(f"test_batch_add failed: {str(e)}") - return False - - -def test_get_cache_stats(): - logger.info("Running test_get_cache_stats") - conv = Conversation(cache_enabled=True) - conv.add("user", "Hello") - try: - stats = conv.get_cache_stats() - assert "hits" in stats - assert "misses" in stats - assert "cached_tokens" in stats - assert "total_tokens" in stats - assert "hit_rate" in stats - logger.success("test_get_cache_stats passed") - return True - except AssertionError as e: - logger.error(f"test_get_cache_stats failed: {str(e)}") - return False - - -def test_list_cached_conversations(): - logger.info("Running test_list_cached_conversations") - temp_dir = setup_temp_conversations_dir() - try: - conv = Conversation( - name="test_list", conversations_dir=str(temp_dir) - ) - conv.add("user", "Test message") - conversations = Conversation.list_cached_conversations( - str(temp_dir) - ) + # Test truncate_memory_with_tokenizer + if conv.tokenizer: # Only if tokenizer is available + conv.truncate_memory_with_tokenizer() + assert len(conv.conversation_history) > 0 + + def test_conversation_metadata(): + """Test conversation metadata and listing""" + test_dir = "./test_conversations_metadata" + os.makedirs(test_dir, exist_ok=True) + try: - assert "test_list" in conversations - logger.success("test_list_cached_conversations passed") - return True - except AssertionError as e: - logger.error( - f"test_list_cached_conversations failed: {str(e)}" + # Create a conversation with metadata + conv = Conversation( + name="Test Conv", + system_prompt="System", + rules="Rules", + custom_rules_prompt="Custom", + conversations_dir=test_dir, + save_enabled=True, + autosave=True, ) - return False - finally: - shutil.rmtree(temp_dir) + # Add a message to trigger save + conv.add("user", "Test message") + + # Give a small delay for autosave + time.sleep(0.1) + + # List conversations and verify + conversations = Conversation.list_conversations(test_dir) + assert len(conversations) >= 1 + found_conv = next( + ( + c + for c in conversations + if c["name"] == "Test Conv" + ), + None, + ) + assert found_conv is not None + assert found_conv["id"] == conv.id -def test_clear(): - logger.info("Running test_clear") - conv = Conversation() - conv.add("user", "Hello") - conv.add("assistant", "Hi") - try: - conv.clear() - assert len(conv.conversation_history) == 0 - logger.success("test_clear passed") - return True - except AssertionError as e: - logger.error(f"test_clear failed: {str(e)}") - return False + finally: + # Cleanup + import shutil + if os.path.exists(test_dir): + shutil.rmtree(test_dir) -def test_save_and_load_json(): - logger.info("Running test_save_and_load_json") - temp_dir = setup_temp_conversations_dir() - file_path = temp_dir / "test_save.json" + def test_time_enabled_messages(): + """Test time-enabled messages""" + conv = Conversation(time_enabled=True) + conv.add("user", "Time test") - try: - conv = Conversation() - conv.add("user", "Hello") - conv.save_as_json(str(file_path)) + # Verify timestamp in message + message = conv.conversation_history[0] + assert "timestamp" in message + assert isinstance(message["timestamp"], str) - conv2 = Conversation() - conv2.load_from_json(str(file_path)) + # Verify time in content when time_enabled is True + assert "Time:" in message["content"] + def test_provider_specific(): + """Test provider-specific functionality""" + # Test in-memory provider + conv_memory = Conversation(provider="in-memory") + conv_memory.add("user", "Test") + assert len(conv_memory.conversation_history) == 1 + + # Test mem0 provider if available try: - assert len(conv2.conversation_history) == 1 - assert conv2.conversation_history[0]["content"] == "Hello" - logger.success("test_save_and_load_json passed") - return True - except AssertionError as e: - logger.error(f"test_save_and_load_json failed: {str(e)}") - return False - finally: - shutil.rmtree(temp_dir) + conv_mem0 = Conversation(provider="mem0") + conv_mem0.add("user", "Test") + # Add appropriate assertions based on mem0 behavior + except Exception: + pass # Skip if mem0 is not available + + def test_tool_output(): + """Test tool output handling""" + conv = Conversation() + tool_output = { + "tool_name": "test_tool", + "output": "test result", + } + conv.add_tool_output_to_agent("tool", tool_output) + assert len(conv.conversation_history) == 1 + assert conv.conversation_history[0]["role"] == "tool" + assert conv.conversation_history[0]["content"] == tool_output -def run_all_tests(): - """Run all test functions and return results.""" - logger.info("Starting test suite execution") - test_results = [] - test_functions = [ - test_add_message, - test_add_message_with_time, - test_delete_message, - test_delete_message_out_of_bounds, - test_update_message, - test_update_message_out_of_bounds, - test_return_history_as_string, - test_search, - test_conversation_cache_creation, - test_conversation_cache_loading, - test_add_multiple_messages, - test_query, - test_display_conversation, - test_count_messages_by_role, - test_get_str, - test_to_json, - test_to_dict, - test_to_yaml, - test_get_last_message_as_string, - test_return_messages_as_list, - test_return_messages_as_dictionary, - test_add_tool_output_to_agent, - test_get_final_message, - test_get_final_message_content, - test_return_all_except_first, - test_return_all_except_first_string, - test_batch_add, - test_get_cache_stats, - test_list_cached_conversations, - test_clear, - test_save_and_load_json, - ] + def test_autosave_functionality(): + """Test autosave functionality and related features""" + test_dir = "./test_conversations_autosave" + os.makedirs(test_dir, exist_ok=True) - for test_func in test_functions: - start_time = datetime.now() try: - result = test_func() - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - test_results.append( - { - "name": test_func.__name__, - "result": "PASS" if result else "FAIL", - "duration": duration, - } + # Test with autosave and save_enabled True + conv = Conversation( + autosave=True, + save_enabled=True, + conversations_dir=test_dir, + name="autosave_test", ) - except Exception as e: - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - test_results.append( - { - "name": test_func.__name__, - "result": "ERROR", - "error": str(e), - "duration": duration, - } + + # Add a message and verify it was auto-saved + conv.add("user", "Test autosave message") + save_path = os.path.join(test_dir, f"{conv.id}.json") + + # Give a small delay for autosave to complete + time.sleep(0.1) + + assert os.path.exists( + save_path + ), f"Save file not found at {save_path}" + + # Load the saved conversation and verify content + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir=test_dir + ) + found_message = False + for msg in loaded_conv.conversation_history: + if "Test autosave message" in str(msg["content"]): + found_message = True + break + assert ( + found_message + ), "Message not found in loaded conversation" + + # Clean up first conversation files + if os.path.exists(save_path): + os.remove(save_path) + + # Test with save_enabled=False + conv_no_save = Conversation( + autosave=False, # Changed to False to prevent autosave + save_enabled=False, + conversations_dir=test_dir, + ) + conv_no_save.add("user", "This shouldn't be saved") + save_path_no_save = os.path.join( + test_dir, f"{conv_no_save.id}.json" + ) + time.sleep(0.1) # Give time for potential save + assert not os.path.exists( + save_path_no_save + ), "File should not exist when save_enabled is False" + + finally: + # Cleanup + import shutil + + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + + def test_advanced_message_handling(): + """Test advanced message handling features""" + conv = Conversation() + + # Test adding messages with metadata + metadata = {"timestamp": "2024-01-01", "session_id": "123"} + conv.add("user", "Test with metadata", metadata=metadata) + + # Test batch operations with different content types + messages = [ + {"role": "user", "content": "Message 1"}, + { + "role": "assistant", + "content": {"response": "Complex response"}, + }, + {"role": "system", "content": ["Multiple", "Items"]}, + ] + conv.batch_add(messages) + assert ( + len(conv.conversation_history) == 4 + ) # Including the first message + + # Test message format consistency + for msg in conv.conversation_history: + assert "role" in msg + assert "content" in msg + if "timestamp" in msg: + assert isinstance(msg["timestamp"], str) + + def test_conversation_metadata_handling(): + """Test handling of conversation metadata and attributes""" + test_dir = "./test_conversations_metadata_handling" + os.makedirs(test_dir, exist_ok=True) + + try: + # Test initialization with all optional parameters + conv = Conversation( + name="Test Conv", + system_prompt="System Prompt", + time_enabled=True, + context_length=2048, + rules="Test Rules", + custom_rules_prompt="Custom Rules", + user="CustomUser:", + provider="in-memory", + conversations_dir=test_dir, + save_enabled=True, ) - logger.error( - f"Test {test_func.__name__} failed with error: {str(e)}" + + # Verify all attributes are set correctly + assert conv.name == "Test Conv" + assert conv.system_prompt == "System Prompt" + assert conv.time_enabled is True + assert conv.context_length == 2048 + assert conv.rules == "Test Rules" + assert conv.custom_rules_prompt == "Custom Rules" + assert conv.user == "CustomUser:" + assert conv.provider == "in-memory" + + # Test saving and loading preserves metadata + conv.save_as_json() + + # Load using load_conversation + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir=test_dir ) - return test_results - - -def generate_markdown_report(results): - """Generate a markdown report from test results.""" - logger.info("Generating test report") - - # Summary - total_tests = len(results) - passed_tests = sum(1 for r in results if r["result"] == "PASS") - failed_tests = sum(1 for r in results if r["result"] == "FAIL") - error_tests = sum(1 for r in results if r["result"] == "ERROR") - - logger.info(f"Total Tests: {total_tests}") - logger.info(f"Passed: {passed_tests}") - logger.info(f"Failed: {failed_tests}") - logger.info(f"Errors: {error_tests}") - - report = "# Test Results Report\n\n" - report += f"Test Run Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - - report += "## Summary\n\n" - report += f"- Total Tests: {total_tests}\n" - report += f"- Passed: {passed_tests}\n" - report += f"- Failed: {failed_tests}\n" - report += f"- Errors: {error_tests}\n\n" - - # Detailed Results - report += "## Detailed Results\n\n" - report += "| Test Name | Result | Duration (s) | Error |\n" - report += "|-----------|---------|--------------|-------|\n" - - for result in results: - name = result["name"] - test_result = result["result"] - duration = f"{result['duration']:.4f}" - error = result.get("error", "") - report += ( - f"| {name} | {test_result} | {duration} | {error} |\n" + # Verify metadata was preserved + assert loaded_conv.name == "Test Conv" + assert loaded_conv.system_prompt == "System Prompt" + assert loaded_conv.rules == "Test Rules" + + finally: + # Cleanup + import shutil + + shutil.rmtree(test_dir) + + def test_time_enabled_features(): + """Test time-enabled message features""" + conv = Conversation(time_enabled=True) + + # Add message and verify timestamp + conv.add("user", "Time test message") + message = conv.conversation_history[0] + + # Verify timestamp format + assert "timestamp" in message + try: + datetime.datetime.fromisoformat(message["timestamp"]) + except ValueError: + assert False, "Invalid timestamp format" + + # Verify time in content + assert "Time:" in message["content"] + assert ( + datetime.datetime.now().strftime("%Y-%m-%d") + in message["content"] ) - return report + def test_provider_specific_features(): + """Test provider-specific features and behaviors""" + # Test in-memory provider + conv_memory = Conversation(provider="in-memory") + conv_memory.add("user", "Test in-memory") + assert len(conv_memory.conversation_history) == 1 + assert ( + "Test in-memory" + in conv_memory.get_last_message_as_string() + ) + # Test mem0 provider if available + try: + from mem0 import AsyncMemory # noqa: F401 -if __name__ == "__main__": - logger.info("Starting test execution") - results = run_all_tests() - report = generate_markdown_report(results) + # Skip actual mem0 testing since it requires async + pass + except ImportError: + pass - # Save report to file - with open("test_results.md", "w") as f: - f.write(report) + # Test invalid provider + invalid_provider = "invalid_provider" + try: + Conversation(provider=invalid_provider) + # If we get here, the provider was accepted when it shouldn't have been + raise AssertionError( + f"Should have raised ValueError for provider '{invalid_provider}'" + ) + except ValueError: + # This is the expected behavior + pass + + # Run all tests + tests = [ + test_basic_initialization, + test_initialization_with_settings, + test_message_manipulation, + test_message_retrieval, + test_saving_loading, + test_output_formats, + test_memory_management, + test_conversation_metadata, + test_time_enabled_messages, + test_provider_specific, + test_tool_output, + test_autosave_functionality, + test_advanced_message_handling, + test_conversation_metadata_handling, + test_time_enabled_features, + test_provider_specific_features, + ] + + for test in tests: + run_test(test) - logger.success( - "Test execution completed. Results saved to test_results.md" - ) + # Print results + print("\nTest Results:") + for result in test_results: + print(result) + + +if __name__ == "__main__": + run_all_tests() diff --git a/tests/utils/test_md_output.py b/tests/utils/test_md_output.py index 57316226e..752337fa8 100644 --- a/tests/utils/test_md_output.py +++ b/tests/utils/test_md_output.py @@ -2,9 +2,6 @@ from dotenv import load_dotenv -# Load environment variables -load_dotenv() - from swarms import ( Agent, ConcurrentWorkflow, @@ -14,6 +11,9 @@ from swarms.utils.formatter import Formatter +# Load environment variables +load_dotenv() + class MarkdownTestSwarm: """A test swarm that demonstrates markdown output capabilities"""