diff --git a/integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py b/integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py index 422fb12605..3b06eec4ea 100644 --- a/integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py +++ b/integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from copy import copy from typing import Any, Literal, Optional, Union @@ -376,6 +377,165 @@ async def delete_all_documents_async(self) -> None: # Namespace doesn't exist (empty collection), which is fine - nothing to delete logger.debug("Namespace '{namespace}' not found. Nothing to delete.", namespace=self.namespace or "default") + @staticmethod + def _update_documents_metadata(documents: list[Document], meta: dict[str, Any]) -> None: + """ + Updates metadata for a list of documents by merging the provided meta dictionary. + + :param documents: List of documents to update. + :param meta: Metadata fields to merge into each document's existing metadata. + """ + for document in documents: + if document.meta is None: + document.meta = {} + document.meta.update(meta) + + def delete_by_filter(self, filters: dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + Pinecone does not support server-side delete by filter, so this method + first searches for matching documents, then deletes them by ID. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + _validate_filters(filters) + + self._initialize_index() + assert self._index is not None, "Index is not initialized" + + documents = self.filter_documents(filters=filters) + if not documents: + return 0 + + document_ids = [doc.id for doc in documents] + + self.delete_documents(document_ids) + + deleted_count = len(document_ids) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self.index_name, + ) + + return deleted_count + + async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + Pinecone does not support server-side delete by filter, so this method + first searches for matching documents, then deletes them by ID. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + _validate_filters(filters) + + await self._initialize_async_index() + assert self._async_index is not None, "Index is not initialized" + + documents = await self.filter_documents_async(filters=filters) + if not documents: + return 0 + + document_ids = [doc.id for doc in documents] + + await self.delete_documents_async(document_ids) + + deleted_count = len(document_ids) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self.index_name, + ) + + return deleted_count + + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + Pinecone does not support server-side update by filter, so this method + first searches for matching documents, then updates their metadata and re-writes them. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. This will be merged with existing metadata. + :returns: The number of documents updated. + """ + _validate_filters(filters) + + if not isinstance(meta, dict): + msg = "meta must be a dictionary" + raise ValueError(msg) + + self._initialize_index() + assert self._index is not None, "Index is not initialized" + + documents = self.filter_documents(filters=filters) + if not documents: + return 0 + + self._update_documents_metadata(documents, meta) + + # Re-write documents with updated metadata + # Using OVERWRITE policy to update existing documents + self.write_documents(documents, policy=DuplicatePolicy.OVERWRITE) + + updated_count = len(documents) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self.index_name, + ) + + return updated_count + + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + Pinecone does not support server-side update by filter, so this method + first searches for matching documents, then updates their metadata and re-writes them. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. This will be merged with existing metadata. + :returns: The number of documents updated. + """ + _validate_filters(filters) + + if not isinstance(meta, dict): + msg = "meta must be a dictionary" + raise ValueError(msg) + + await self._initialize_async_index() + assert self._async_index is not None, "Index is not initialized" + + documents = await self.filter_documents_async(filters=filters) + if not documents: + return 0 + + self._update_documents_metadata(documents, meta) + + # Re-write documents with updated metadata + # Using OVERWRITE policy to update existing documents + await self.write_documents_async(documents, policy=DuplicatePolicy.OVERWRITE) + + updated_count = len(documents) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self.index_name, + ) + + return updated_count + def _embedding_retrieval( self, query_embedding: list[float], diff --git a/integrations/pinecone/tests/test_document_store.py b/integrations/pinecone/tests/test_document_store.py index 152ac436d4..6d6e8d577e 100644 --- a/integrations/pinecone/tests/test_document_store.py +++ b/integrations/pinecone/tests/test_document_store.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + import os import time from unittest.mock import patch @@ -342,3 +346,100 @@ def test_sentence_window_retriever(self, document_store: PineconeDocumentStore): result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]]) assert len(result["context_windows"]) == 1 + + def test_delete_by_filter(self, document_store: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + + # delete documents with category="A" + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert deleted_count == 2 + assert document_store.count_documents() == 1 + + # verify only category B remains + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + def test_delete_by_filter_no_matches(self, document_store: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + document_store.write_documents(docs) + + # try to delete documents with category="C" (no matches) + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "C"} + ) + assert deleted_count == 0 + assert document_store.count_documents() == 2 + + def test_update_by_filter(self, document_store: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + document_store.write_documents(docs) + + # update status for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + def test_update_by_filter_multiple_fields(self, document_store: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update multiple fields for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + meta={"status": "published", "priority": "high"}, + ) + assert updated_count == 2 + + # Verify the updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + assert doc.meta["priority"] == "high" + + def test_update_by_filter_no_matches(self, document_store: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + # Try to update documents with category="C" (no matches) + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"} + ) + assert updated_count == 0 + assert document_store.count_documents() == 2 diff --git a/integrations/pinecone/tests/test_document_store_async.py b/integrations/pinecone/tests/test_document_store_async.py index 60f1abed54..6832ddf819 100644 --- a/integrations/pinecone/tests/test_document_store_async.py +++ b/integrations/pinecone/tests/test_document_store_async.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + import os import numpy as np @@ -139,3 +143,99 @@ async def test_sentence_window_retriever(self, document_store_async: PineconeDoc result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]]) assert len(result["context_windows"]) == 1 + + async def test_delete_by_filter_async(self, document_store_async: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + await document_store_async.write_documents_async(docs) + + # delete documents with category="A" + deleted_count = await document_store_async.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert deleted_count == 2 + assert await document_store_async.count_documents_async() == 1 + + # only category B remains + remaining_docs = await document_store_async.filter_documents_async() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + async def test_delete_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + await document_store_async.write_documents_async(docs) + + # delete documents with category="C" (no matches) + deleted_count = await document_store_async.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "C"} + ) + assert deleted_count == 0 + assert await document_store_async.count_documents_async() == 2 + + async def test_update_by_filter_async(self, document_store_async: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + await document_store_async.write_documents_async(docs) + assert await document_store_async.count_documents_async() == 3 + + # update status for category="A" documents + updated_count = await document_store_async.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + published_docs = await document_store_async.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + async def test_update_by_filter_async_multiple_fields(self, document_store_async: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}), + ] + await document_store_async.write_documents_async(docs) + + # update multiple fields for category="A" documents + updated_count = await document_store_async.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + meta={"status": "published", "priority": "high"}, + ) + assert updated_count == 2 + + # verify the updates + published_docs = await document_store_async.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + assert doc.meta["priority"] == "high" + + async def test_update_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + await document_store_async.write_documents_async(docs) + + # try to update documents with category="C" (no matches) + updated_count = await document_store_async.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"} + ) + assert updated_count == 0 + assert await document_store_async.count_documents_async() == 2