diff --git a/README.md b/README.md index 04386f2..b19994a 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,12 @@ Choose the deployment mode that fits your needs. pip install sochdb ``` +Optional framework integration: + +```bash +pip install "sochdb[crewai]" +``` + Or from source: ```bash cd sochdb-python-sdk @@ -24,6 +30,45 @@ pip install -e . ## Architecture: Flexible Deployment ``` + +### CrewAI Integration + +The Python SDK includes an optional CrewAI integration layer for SochDB-backed +knowledge search and memory writes. + +Available helpers: + +- `SochDBKnowledgeStore` +- `create_crewai_tools(...)` +- `SochDBKnowledgeStore.from_collection(...)` for embedded mode +- `SochDBKnowledgeStore.from_client(...)` for gRPC / hosted mode + +Example: + +```python +from sochdb import Database, Namespace, SochDBKnowledgeStore, create_crewai_tools + +def embed(texts): + ... + +db = Database.open("./crewai_demo") +ns = Namespace(db, "crew") +collection = ns.create_collection("knowledge", dimension=384) + +store = SochDBKnowledgeStore.from_collection(collection, embedder=embed) +store.add_texts( + ["SochDB supports embedded and gRPC modes."], + metadatas=[{"topic": "architecture"}], + ids=["arch-1"], +) + +search_tool, remember_tool = create_crewai_tools(store, top_k=3) +``` + +See `examples/28_crewai_knowledge_tools.py` for a complete example. +See `examples/29_crewai_remote_tools.py` for the hosted/gRPC variant. +The remote example also supports `SOCHDB_CREWAI_SKIP_KICKOFF=1` to smoke-test +remote storage and retrieval without LLM credentials. ┌─────────────────────────────────────────────────────────────┐ │ DEPLOYMENT OPTIONS │ ├─────────────────────────────────────────────────────────────┤ diff --git a/examples/28_crewai_knowledge_tools.py b/examples/28_crewai_knowledge_tools.py new file mode 100644 index 0000000..8af2c9a --- /dev/null +++ b/examples/28_crewai_knowledge_tools.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +CrewAI + SochDB knowledge tool example. + +This example shows the supported integration shape in the Python SDK: + +- SochDB stores searchable project knowledge +- a user-supplied embedder converts text into vectors +- CrewAI agents use SochDB-backed search and memory tools + +Install: + pip install -e ".[crewai]" + +Optional environment: + OPENAI_API_KEY= + CREWAI_MODEL=gpt-4o-mini +""" + +from __future__ import annotations + +import hashlib +import math +import os +import tempfile +from typing import Sequence + +from sochdb import Database, Namespace, SochDBKnowledgeStore, create_crewai_tools + + +def deterministic_embed(texts: Sequence[str], dim: int = 32) -> list[list[float]]: + """ + Tiny local embedder for demos and tests. + + This is not semantically strong like OpenAI or sentence-transformers, but it + keeps the example runnable without another service dependency. + """ + + vectors: list[list[float]] = [] + for text in texts: + digest = hashlib.sha256(text.encode("utf-8")).digest() + values = [((digest[i % len(digest)] / 255.0) * 2.0) - 1.0 for i in range(dim)] + norm = math.sqrt(sum(v * v for v in values)) or 1.0 + vectors.append([v / norm for v in values]) + return vectors + + +def build_knowledge_store() -> SochDBKnowledgeStore: + tempdir = tempfile.mkdtemp(prefix="sochdb-crewai-") + db = Database.open(tempdir) + namespace = Namespace(db, "crewai_demo") + collection = namespace.create_collection("knowledge", dimension=32) + + store = SochDBKnowledgeStore.from_collection(collection, embedder=deterministic_embed) + store.add_texts( + [ + "SochDB supports both embedded and gRPC deployment modes.", + "The hosted SochDB demo endpoint listens on studio.agentslab.host:50053.", + "The corrected 10GB benchmark showed about 506 QPS after one-time index load.", + ], + metadatas=[ + {"topic": "architecture"}, + {"topic": "deployment"}, + {"topic": "benchmark"}, + ], + ids=["arch-1", "deploy-1", "bench-1"], + ) + return store + + +def main() -> None: + from crewai import Agent, Crew, Task + + store = build_knowledge_store() + search_tool, remember_tool = create_crewai_tools(store, top_k=3) + + model = os.environ.get("CREWAI_MODEL", "gpt-4o-mini") + + researcher = Agent( + role="SochDB Researcher", + goal="Answer questions using the SochDB knowledge base", + backstory="You ground answers in the project knowledge store before responding.", + llm=model, + tools=[search_tool, remember_tool], + verbose=True, + ) + + task = Task( + description=( + "Find the current 10GB benchmark takeaway and summarize it in 2-3 sentences. " + "Use the SochDB tools instead of guessing." + ), + expected_output="A short grounded summary of the latest 10GB benchmark result.", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task], verbose=True) + result = crew.kickoff() + + print("\n=== Crew Result ===\n") + print(result) + + +if __name__ == "__main__": + main() diff --git a/examples/29_crewai_remote_tools.py b/examples/29_crewai_remote_tools.py new file mode 100644 index 0000000..8ed9906 --- /dev/null +++ b/examples/29_crewai_remote_tools.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +CrewAI + SochDB hosted remote tool example. + +This example shows the same CrewAI integration surface as the embedded example, +but points it at a remote SochDB collection over gRPC. + +Environment variables: + SOCHDB_GRPC_ADDRESS default: studio.agentslab.host:50053 + SOCHDB_NAMESPACE default: default + CREWAI_MODEL default: gpt-4o-mini + OPENAI_API_KEY required by CrewAI for the default LLM provider + SOCHDB_CREWAI_SKIP_KICKOFF=1 to only validate remote storage/search setup + +Install: + pip install -e ".[crewai]" +""" + +from __future__ import annotations + +import hashlib +import math +import os +import time +from pathlib import Path +from typing import Sequence + +import sys + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from sochdb import SochDBClient, SochDBKnowledgeStore, create_crewai_tools + + +DEFAULT_GRPC_ADDRESS = "studio.agentslab.host:50053" +DEFAULT_NAMESPACE = "default" + + +def deterministic_embed(texts: Sequence[str], dim: int = 32) -> list[list[float]]: + """Small local embedder so the demo does not require a second model service.""" + + vectors: list[list[float]] = [] + for text in texts: + digest = hashlib.sha256(text.encode("utf-8")).digest() + values = [((digest[i % len(digest)] / 255.0) * 2.0) - 1.0 for i in range(dim)] + norm = math.sqrt(sum(v * v for v in values)) or 1.0 + vectors.append([v / norm for v in values]) + return vectors + + +def build_remote_store(client: SochDBClient, namespace: str) -> tuple[str, SochDBKnowledgeStore]: + run_id = f"crewai-remote-{int(time.time())}" + collection_name = f"sdk_crewai_remote_{run_id}" + client.create_collection(collection_name, dimension=32, namespace=namespace, metric="cosine") + + store = SochDBKnowledgeStore.from_client( + client, + collection_name=collection_name, + namespace=namespace, + embedder=deterministic_embed, + ) + store.add_texts( + [ + "The hosted SochDB demo endpoint listens on studio.agentslab.host:50053.", + "The corrected 10GB benchmark showed about 506 QPS after one-time index load.", + "BAAI/bge-base-en-v1.5 is the best published SciFact quality result so far.", + ], + metadatas=[ + {"topic": "deployment"}, + {"topic": "benchmark"}, + {"topic": "quality"}, + ], + ids=[f"{run_id}-deploy", f"{run_id}-bench", f"{run_id}-quality"], + ) + return collection_name, store + + +def main() -> None: + grpc_address = os.environ.get("SOCHDB_GRPC_ADDRESS", DEFAULT_GRPC_ADDRESS) + namespace = os.environ.get("SOCHDB_NAMESPACE", DEFAULT_NAMESPACE) + model = os.environ.get("CREWAI_MODEL", "gpt-4o-mini") + skip_kickoff = os.environ.get("SOCHDB_CREWAI_SKIP_KICKOFF", "").lower() in { + "1", + "true", + "yes", + } + + client = SochDBClient(grpc_address) + collection_name, store = build_remote_store(client, namespace) + print(f"Using remote collection: {collection_name} in namespace={namespace}") + try: + if skip_kickoff: + hits = store.search("What is the 10GB benchmark takeaway?", top_k=2) + print("\n=== Remote Store Smoke ===\n") + print(store.format_hits(hits)) + return + + if not os.environ.get("OPENAI_API_KEY"): + raise SystemExit( + "OPENAI_API_KEY is required for the CrewAI kickoff. " + "Set SOCHDB_CREWAI_SKIP_KICKOFF=1 to validate the remote SochDB path without an LLM." + ) + + from crewai import Agent, Crew, Task + + search_tool, remember_tool = create_crewai_tools(store, top_k=3) + + researcher = Agent( + role="SochDB Remote Researcher", + goal="Answer questions using the hosted SochDB knowledge base.", + backstory="You always search the remote collection before making a claim.", + llm=model, + tools=[search_tool, remember_tool], + verbose=True, + ) + + task = Task( + description=( + "Find the current 10GB benchmark takeaway and summarize it in 2-3 sentences. " + "Use the SochDB tools and mention that the knowledge came from the remote store." + ), + expected_output="A short grounded summary of the latest 10GB benchmark result.", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task], verbose=True) + result = crew.kickoff() + print("\n=== Crew Result ===\n") + print(result) + finally: + client.close() + + +if __name__ == "__main__": + main() diff --git a/examples/README.md b/examples/README.md index d927e45..ea7ef6a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -28,6 +28,8 @@ cargo build --release | `08_ipc_client.py` | IPC | Multi-process access via IPC | | `26_hosted_studio_ingest.py` | gRPC + Studio | Remote write plus hosted Studio event ingestion | | `27_hosted_remote_smoke.py` | gRPC | Minimal hosted remote smoke test for SDK parity | +| `28_crewai_knowledge_tools.py` | Embedded + CrewAI | CrewAI tools backed by SochDB knowledge search | +| `29_crewai_remote_tools.py` | gRPC + CrewAI | CrewAI tools backed by a remote SochDB collection | ## Running Examples @@ -66,6 +68,39 @@ There is also a matching manual GitHub Actions workflow at `.github/workflows/hosted-smoke.yml` for running the same hosted smoke path on demand. +Latest hosted validation: + +- GitHub-hosted workflow passed on May 5, 2026: + `https://github.com/SaiSandeepKantareddy/sochdb-python-sdk/actions/runs/25357489415` + +### CrewAI Knowledge Tools (28) + +Runs a CrewAI agent with SochDB-backed `search` and `remember` tools: + +```bash +pip install -e ".[crewai]" +OPENAI_API_KEY=... python examples/28_crewai_knowledge_tools.py +``` + +### CrewAI Remote Knowledge Tools (29) + +Runs the same CrewAI tool surface against a hosted SochDB collection over gRPC: + +```bash +pip install -e ".[crewai]" +OPENAI_API_KEY=... \ +SOCHDB_GRPC_ADDRESS=studio.agentslab.host:50053 \ +python examples/29_crewai_remote_tools.py +``` + +If you want to validate just the remote SochDB side before wiring an LLM +provider, you can run a storage/search smoke instead: + +```bash +SOCHDB_GRPC_ADDRESS=studio.agentslab.host:50053 \ +SOCHDB_CREWAI_SKIP_KICKOFF=1 \ +python examples/29_crewai_remote_tools.py +``` ## Directory Structure ``` @@ -81,6 +116,8 @@ examples/ ├── 08_ipc_client.py # IPC client examples ├── 26_hosted_studio_ingest.py # Remote SochDB + hosted Studio example ├── 27_hosted_remote_smoke.py # Minimal hosted gRPC smoke test +├── 28_crewai_knowledge_tools.py # CrewAI tools backed by SochDB knowledge +├── 29_crewai_remote_tools.py # CrewAI tools backed by remote SochDB └── shared/ └── mock_server.py # Mock server for testing ``` diff --git a/pyproject.toml b/pyproject.toml index 5d90562..21fa86f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,9 @@ dependencies = [ analytics = [ "posthog>=3.0.0", ] +crewai = [ + "crewai", +] dev = [ "pytest>=7.0", "pytest-cov>=4.0", @@ -52,6 +55,7 @@ dev = [ ] all = [ "posthog>=3.0.0", + "crewai", ] [project.urls] diff --git a/src/sochdb/__init__.py b/src/sochdb/__init__.py index d1095a2..ac6fef4 100644 --- a/src/sochdb/__init__.py +++ b/src/sochdb/__init__.py @@ -70,6 +70,14 @@ from .grpc_client import SochDBClient, SearchResult, Document, GraphNode, GraphEdge, TemporalEdge from .ipc_client import IpcClient from .studio import StudioAPIError, StudioClient, StudioEventIngestResult +from .integrations import ( + SochDBKnowledgeHit, + SochDBKnowledgeStore, + SochDBRememberTool, + SochDBSearchTool, + create_crewai_tools, + crewai_available, +) # Format utilities from .format import ( @@ -157,6 +165,12 @@ "StudioClient", "StudioAPIError", "StudioEventIngestResult", + "SochDBKnowledgeHit", + "SochDBKnowledgeStore", + "SochDBSearchTool", + "SochDBRememberTool", + "create_crewai_tools", + "crewai_available", # Format utilities "WireFormat", diff --git a/src/sochdb/integrations/__init__.py b/src/sochdb/integrations/__init__.py new file mode 100644 index 0000000..e77c3ee --- /dev/null +++ b/src/sochdb/integrations/__init__.py @@ -0,0 +1,21 @@ +""" +Optional framework integrations for SochDB. +""" + +from .crewai import ( + SochDBKnowledgeHit, + SochDBKnowledgeStore, + SochDBRememberTool, + SochDBSearchTool, + create_crewai_tools, + crewai_available, +) + +__all__ = [ + "SochDBKnowledgeHit", + "SochDBKnowledgeStore", + "SochDBSearchTool", + "SochDBRememberTool", + "create_crewai_tools", + "crewai_available", +] diff --git a/src/sochdb/integrations/crewai.py b/src/sochdb/integrations/crewai.py new file mode 100644 index 0000000..6c9d4b5 --- /dev/null +++ b/src/sochdb/integrations/crewai.py @@ -0,0 +1,428 @@ +""" +CrewAI integration helpers for SochDB. + +This module provides: + +- a backend-agnostic SochDB knowledge store +- optional CrewAI tools for search and memory writes + +The integration supports both embedded collections and remote gRPC collections. +CrewAI itself remains an optional dependency. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import json +import uuid +from typing import Any, Callable, Dict, Iterable, List, Optional, Protocol, Sequence + +from ..grpc_client import Document, SochDBClient +from ..namespace import Collection + + +EmbeddingFn = Callable[[Sequence[str]], Sequence[Sequence[float]]] + + +@dataclass +class SochDBKnowledgeHit: + """Search result returned by :class:`SochDBKnowledgeStore`.""" + + id: str + content: str + metadata: Dict[str, str] + score: Optional[float] = None + + +class _KnowledgeBackend(Protocol): + def add_documents(self, documents: List[Dict[str, Any]]) -> List[str]: + ... + + def search( + self, + embedding: Sequence[float], + k: int, + metadata_filter: Optional[Dict[str, str]] = None, + ) -> List[SochDBKnowledgeHit]: + ... + + +def _normalize_metadata(metadata: Optional[Dict[str, Any]]) -> Dict[str, str]: + if not metadata: + return {} + + normalized: Dict[str, str] = {} + for key, value in metadata.items(): + if value is None: + continue + if isinstance(value, (str, int, float, bool)): + normalized[str(key)] = str(value) + else: + normalized[str(key)] = json.dumps(value, sort_keys=True) + return normalized + + +class _EmbeddedCollectionBackend: + def __init__(self, collection: Collection): + self._collection = collection + + def add_documents(self, documents: List[Dict[str, Any]]) -> List[str]: + ids = [doc["id"] for doc in documents] + embeddings = [doc["embedding"] for doc in documents] + metadatas = [doc.get("metadata", {}) for doc in documents] + contents = [doc.get("content", "") for doc in documents] + self._collection.add( + embeddings=embeddings, + ids=ids, + metadatas=metadatas, + documents=contents, + ) + return ids + + def search( + self, + embedding: Sequence[float], + k: int, + metadata_filter: Optional[Dict[str, str]] = None, + ) -> List[SochDBKnowledgeHit]: + results = self._collection.query( + query_embeddings=[list(embedding)], + n_results=k, + where=metadata_filter, + include=["metadatas", "documents"], + ) + hits: List[SochDBKnowledgeHit] = [] + ids = results.get("ids", [[]])[0] + distances = results.get("distances", [[]])[0] + metadatas = results.get("metadatas", [[]])[0] + documents = results.get("documents", [[]])[0] + for doc_id, distance, metadata, content in zip(ids, distances, metadatas, documents): + hits.append( + SochDBKnowledgeHit( + id=str(doc_id), + content=str(content or ""), + metadata=_normalize_metadata(metadata), + score=1.0 - float(distance), + ) + ) + return hits + + +class _GrpcCollectionBackend: + def __init__( + self, + client: SochDBClient, + collection_name: str, + namespace: str = "default", + ): + self._client = client + self._collection_name = collection_name + self._namespace = namespace + + def add_documents(self, documents: List[Dict[str, Any]]) -> List[str]: + grpc_docs: List[Dict[str, Any]] = [] + for doc in documents: + grpc_docs.append( + { + "id": doc["id"], + "content": doc.get("content", ""), + "embedding": list(doc["embedding"]), + "metadata": _normalize_metadata(doc.get("metadata")), + } + ) + return self._client.add_documents( + self._collection_name, + grpc_docs, + namespace=self._namespace, + ) + + def search( + self, + embedding: Sequence[float], + k: int, + metadata_filter: Optional[Dict[str, str]] = None, + ) -> List[SochDBKnowledgeHit]: + docs: List[Document] = self._client.search_collection( + self._collection_name, + list(embedding), + k=k, + namespace=self._namespace, + filter=metadata_filter, + ) + return [ + SochDBKnowledgeHit( + id=doc.id, + content=doc.content, + metadata=_normalize_metadata(doc.metadata), + ) + for doc in docs + ] + + +class SochDBKnowledgeStore: + """ + Small adapter that turns SochDB collections into a CrewAI-friendly + searchable knowledge store. + + The embedder callback is intentionally user-supplied so teams can plug in + OpenAI, Azure OpenAI, local sentence-transformers, FastEmbed, or any other + embedding provider they already use. + """ + + def __init__( + self, + *, + backend: _KnowledgeBackend, + embedder: EmbeddingFn, + ): + self._backend = backend + self._embedder = embedder + + @classmethod + def from_collection( + cls, + collection: Collection, + *, + embedder: EmbeddingFn, + ) -> "SochDBKnowledgeStore": + return cls(backend=_EmbeddedCollectionBackend(collection), embedder=embedder) + + @classmethod + def from_client( + cls, + client: SochDBClient, + *, + collection_name: str, + embedder: EmbeddingFn, + namespace: str = "default", + ) -> "SochDBKnowledgeStore": + return cls( + backend=_GrpcCollectionBackend(client, collection_name, namespace=namespace), + embedder=embedder, + ) + + def add_texts( + self, + texts: Sequence[str], + *, + metadatas: Optional[Sequence[Optional[Dict[str, Any]]]] = None, + ids: Optional[Sequence[str]] = None, + ) -> List[str]: + if not texts: + return [] + + embeddings = self._embedder(texts) + embeddings = [list(map(float, embedding)) for embedding in embeddings] + if len(embeddings) != len(texts): + raise ValueError("Embedder returned a different number of embeddings than texts") + + if metadatas is None: + metadatas = [None] * len(texts) + if len(metadatas) != len(texts): + raise ValueError("metadatas length must match texts length") + + if ids is None: + ids = [f"sochdb-crewai-{idx}" for idx in range(len(texts))] + if len(ids) != len(texts): + raise ValueError("ids length must match texts length") + + documents: List[Dict[str, Any]] = [] + for doc_id, text, embedding, metadata in zip(ids, texts, embeddings, metadatas): + documents.append( + { + "id": str(doc_id), + "content": text, + "embedding": embedding, + "metadata": _normalize_metadata(metadata), + } + ) + return self._backend.add_documents(documents) + + def remember( + self, + text: str, + *, + metadata: Optional[Dict[str, Any]] = None, + doc_id: Optional[str] = None, + ) -> str: + generated_id = doc_id or f"sochdb-crewai-memory-{uuid.uuid4()}" + return self.add_texts([text], metadatas=[metadata], ids=[generated_id])[0] + + def search( + self, + query: str, + *, + top_k: int = 5, + metadata_filter: Optional[Dict[str, Any]] = None, + ) -> List[SochDBKnowledgeHit]: + embedding = self._embedder([query]) + if len(embedding) != 1: + raise ValueError("Embedder must return exactly one embedding for a single query") + normalized_filter = _normalize_metadata(metadata_filter) + return self._backend.search(embedding[0], top_k, normalized_filter or None) + + @staticmethod + def format_hits(hits: Iterable[SochDBKnowledgeHit]) -> str: + lines: List[str] = [] + for idx, hit in enumerate(hits, start=1): + score = "" if hit.score is None else f" (score={hit.score:.4f})" + metadata = "" + if hit.metadata: + metadata = f"\nmetadata: {json.dumps(hit.metadata, sort_keys=True)}" + lines.append(f"{idx}. [{hit.id}]{score}\n{hit.content}{metadata}") + return "\n\n".join(lines) if lines else "No matching knowledge found." + + +def crewai_available() -> bool: + try: + import crewai # noqa: F401 + except Exception: + return False + return True + + +def _require_crewai() -> None: + if not crewai_available(): + raise ImportError( + "CrewAI integration requires the optional 'crewai' dependency. " + "Install with: pip install 'sochdb[crewai]' or pip install crewai" + ) + + +try: + from pydantic import BaseModel, Field + try: + from pydantic import ConfigDict + except ImportError: # pragma: no cover - pydantic v1 compatibility + ConfigDict = None # type: ignore[assignment] + from crewai.tools import BaseTool + _HAS_CREWAI = True +except Exception: # pragma: no cover - handled by lazy import checks + BaseModel = object # type: ignore[assignment] + Field = None # type: ignore[assignment] + BaseTool = object # type: ignore[assignment] + _HAS_CREWAI = False + + +if _HAS_CREWAI: + class SochDBSearchInput(BaseModel): + query: str = Field(..., description="Natural-language question to search in SochDB.") + metadata_filter_json: Optional[str] = Field( + default=None, + description="Optional JSON object used as a metadata filter.", + ) + + + class SochDBSearchTool(BaseTool): + if ConfigDict is not None: + model_config = ConfigDict(arbitrary_types_allowed=True) + else: # pragma: no cover - pydantic v1 compatibility + class Config: + arbitrary_types_allowed = True + + name: str = "sochdb_search" + description: str = ( + "Searches a SochDB-backed knowledge base for relevant context. " + "Use this when you need grounded project or domain facts." + ) + args_schema = SochDBSearchInput + + knowledge_store: SochDBKnowledgeStore + top_k: int = 5 + + def _run(self, query: str, metadata_filter_json: Optional[str] = None) -> str: + metadata_filter = None + if metadata_filter_json: + parsed = json.loads(metadata_filter_json) + if not isinstance(parsed, dict): + raise ValueError("metadata_filter_json must decode to a JSON object") + metadata_filter = parsed + hits = self.knowledge_store.search( + query, + top_k=self.top_k, + metadata_filter=metadata_filter, + ) + return self.knowledge_store.format_hits(hits) + + + class SochDBRememberInput(BaseModel): + content: str = Field(..., description="Text content to store in SochDB.") + metadata_json: Optional[str] = Field( + default=None, + description="Optional JSON object with metadata to store alongside the content.", + ) + doc_id: Optional[str] = Field( + default=None, + description="Optional stable identifier for the stored memory record.", + ) + + + class SochDBRememberTool(BaseTool): + if ConfigDict is not None: + model_config = ConfigDict(arbitrary_types_allowed=True) + else: # pragma: no cover - pydantic v1 compatibility + class Config: + arbitrary_types_allowed = True + + name: str = "sochdb_remember" + description: str = ( + "Stores a new memory or knowledge snippet in SochDB so future tasks " + "can retrieve it." + ) + args_schema = SochDBRememberInput + + knowledge_store: SochDBKnowledgeStore + + def _run( + self, + content: str, + metadata_json: Optional[str] = None, + doc_id: Optional[str] = None, + ) -> str: + metadata = None + if metadata_json: + parsed = json.loads(metadata_json) + if not isinstance(parsed, dict): + raise ValueError("metadata_json must decode to a JSON object") + metadata = parsed + stored_id = self.knowledge_store.remember(content, metadata=metadata, doc_id=doc_id) + return f"Stored memory in SochDB with id={stored_id}" + +else: + class SochDBSearchTool: # pragma: no cover - lazy dependency guard + def __init__(self, *args: Any, **kwargs: Any) -> None: + _require_crewai() + + + class SochDBRememberTool: # pragma: no cover - lazy dependency guard + def __init__(self, *args: Any, **kwargs: Any) -> None: + _require_crewai() + + +def create_crewai_tools( + knowledge_store: SochDBKnowledgeStore, + *, + top_k: int = 5, +) -> List[Any]: + """ + Create the default CrewAI tool set backed by SochDB. + + Returns: + `[SochDBSearchTool(...), SochDBRememberTool(...)]` + """ + + _require_crewai() + return [ + SochDBSearchTool(knowledge_store=knowledge_store, top_k=top_k), + SochDBRememberTool(knowledge_store=knowledge_store), + ] + + +__all__ = [ + "SochDBKnowledgeHit", + "SochDBKnowledgeStore", + "SochDBSearchTool", + "SochDBRememberTool", + "create_crewai_tools", + "crewai_available", +] diff --git a/tests/test_crewai_integration.py b/tests/test_crewai_integration.py new file mode 100644 index 0000000..dfcbc55 --- /dev/null +++ b/tests/test_crewai_integration.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +from pathlib import Path +import sys +from typing import Any, Dict, List, Optional, Sequence + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from sochdb.grpc_client import Document +from sochdb.integrations.crewai import SochDBKnowledgeHit, SochDBKnowledgeStore + + +class FakeBackend: + def __init__(self) -> None: + self.documents: List[Dict[str, Any]] = [] + + def add_documents(self, documents: List[Dict[str, Any]]) -> List[str]: + self.documents.extend(documents) + return [doc["id"] for doc in documents] + + def search( + self, + embedding: Sequence[float], + k: int, + metadata_filter: Optional[Dict[str, str]] = None, + ) -> List[SochDBKnowledgeHit]: + hits: List[SochDBKnowledgeHit] = [] + for doc in self.documents: + metadata = doc.get("metadata", {}) + if metadata_filter: + allowed = all(metadata.get(key) == value for key, value in metadata_filter.items()) + if not allowed: + continue + hits.append( + SochDBKnowledgeHit( + id=doc["id"], + content=doc.get("content", ""), + metadata=metadata, + score=0.9, + ) + ) + return hits[:k] + + +class FakeGrpcClient: + def __init__(self) -> None: + self.add_calls: List[Dict[str, Any]] = [] + self.search_calls: List[Dict[str, Any]] = [] + + def add_documents( + self, + collection_name: str, + documents: List[Dict[str, Any]], + namespace: str = "default", + ) -> List[str]: + self.add_calls.append( + { + "collection_name": collection_name, + "documents": documents, + "namespace": namespace, + } + ) + return [doc["id"] for doc in documents] + + def search_collection( + self, + collection_name: str, + query: List[float], + k: int = 10, + namespace: str = "default", + filter: Optional[Dict[str, str]] = None, + ) -> List[Document]: + self.search_calls.append( + { + "collection_name": collection_name, + "query": query, + "k": k, + "namespace": namespace, + "filter": filter, + } + ) + return [ + Document( + id="grpc-doc-1", + content="remote benchmark note", + embedding=[1.0, 0.0], + metadata={"topic": "benchmark"}, + ) + ] + + +def fake_embedder(texts: Sequence[str]) -> List[List[float]]: + return [[float(len(text)), 1.0] for text in texts] + + +def test_add_texts_normalizes_metadata() -> None: + backend = FakeBackend() + store = SochDBKnowledgeStore(backend=backend, embedder=fake_embedder) + + ids = store.add_texts( + ["hello world"], + metadatas=[{"topic": "demo", "count": 3, "nested": {"a": 1}}], + ids=["doc-1"], + ) + + assert ids == ["doc-1"] + assert backend.documents[0]["metadata"]["topic"] == "demo" + assert backend.documents[0]["metadata"]["count"] == "3" + assert backend.documents[0]["metadata"]["nested"] == '{"a": 1}' + + +def test_search_returns_filtered_hits() -> None: + backend = FakeBackend() + store = SochDBKnowledgeStore(backend=backend, embedder=fake_embedder) + store.add_texts( + ["architecture note", "benchmark note"], + metadatas=[{"topic": "architecture"}, {"topic": "benchmark"}], + ids=["doc-a", "doc-b"], + ) + + hits = store.search("benchmark", top_k=5, metadata_filter={"topic": "benchmark"}) + + assert len(hits) == 1 + assert hits[0].id == "doc-b" + assert hits[0].metadata["topic"] == "benchmark" + + +def test_remember_generates_unique_ids() -> None: + backend = FakeBackend() + store = SochDBKnowledgeStore(backend=backend, embedder=fake_embedder) + + first = store.remember("fact one") + second = store.remember("fact two") + + assert first != second + assert len(backend.documents) == 2 + + +def test_format_hits_is_human_readable() -> None: + text = SochDBKnowledgeStore.format_hits( + [ + SochDBKnowledgeHit( + id="doc-1", + content="SochDB supports embedded mode.", + metadata={"topic": "architecture"}, + score=0.75, + ) + ] + ) + + assert "doc-1" in text + assert "architecture" in text + assert "0.7500" in text + + +def test_from_client_uses_grpc_backend() -> None: + client = FakeGrpcClient() + store = SochDBKnowledgeStore.from_client( + client, + collection_name="knowledge", + namespace="crew", + embedder=fake_embedder, + ) + + inserted_ids = store.add_texts( + ["remote deployment note"], + metadatas=[{"topic": "deployment"}], + ids=["grpc-1"], + ) + hits = store.search("deployment", top_k=3, metadata_filter={"topic": "benchmark"}) + + assert inserted_ids == ["grpc-1"] + assert client.add_calls[0]["collection_name"] == "knowledge" + assert client.add_calls[0]["namespace"] == "crew" + assert client.search_calls[0]["filter"] == {"topic": "benchmark"} + assert hits[0].id == "grpc-doc-1"