diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index 91e1b905f..cdbf50bc7 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -11,6 +11,18 @@ from weaviate.collections.classes.filters import Filter, FilterReturn +def validate_filters(filters: dict[str, Any] | None) -> None: + """ + Validates that filters have the correct structure. + + :param filters: The filters to validate. + :raises ValueError: If filters are provided but have invalid syntax. + """ + if filters and "operator" not in filters and "conditions" not in filters: + msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." + raise ValueError(msg) + + def convert_filters(filters: dict[str, Any]) -> FilterReturn: """ Convert filters from Haystack format to Weaviate format. diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 1a1b43859..6406b2e4d 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + import base64 import datetime import json @@ -19,7 +20,7 @@ from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 -from ._filters import convert_filters +from ._filters import convert_filters, validate_filters from .auth import AuthCredentials logger = logging.getLogger(__name__) @@ -419,7 +420,7 @@ def _query_with_filters(self, filters: dict[str, Any]) -> list[DataObject[dict[s # # Nonetheless there's also another issue, paginating with limit and offset is not efficient # and it's still restricted by the QUERY_MAXIMUM_RESULTS environment variable. - # If the sum of limit and offest is greater than QUERY_MAXIMUM_RESULTS an error is raised. + # If the sum of limit and offset is greater than QUERY_MAXIMUM_RESULTS an error is raised. # See the official docs for more: # https://weaviate.io/developers/weaviate/api/graphql/additional-operators#performance-considerations offset = 0 @@ -452,9 +453,7 @@ def filter_documents(self, filters: Optional[dict[str, Any]] = None) -> list[Doc :param filters: The filters to apply to the document list. :returns: A list of Documents that match the given filters. """ - if filters and "operator" not in filters and "conditions" not in filters: - msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." - raise ValueError(msg) + validate_filters(filters) result = [] if filters: @@ -483,7 +482,7 @@ def _batch_write(self, documents: list[Document]) -> int: vector=doc.embedding, ) if failed_objects := self.client.batch.failed_objects: - # We fallback to use the UUID if the _original_id is not present, this is just to be + # We fall back to use the UUID if the _original_id is not present, this is just to be mapped_objects = {} for obj in failed_objects: properties = obj.object_.properties or {} @@ -507,7 +506,7 @@ def _batch_write(self, documents: list[Document]) -> int: def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int: """ Writes documents to Weaviate using the specified policy. - This doesn't uses the batch API, so it's slower than _batch_write. + This doesn't use the batch API, so it's slower than _batch_write. If policy is set to SKIP it will skip any document that already exists. If policy is set to FAIL it will raise an exception if any of the documents already exists. """ @@ -610,6 +609,231 @@ def delete_all_documents(self, *, recreate_index: bool = False, batch_size: int "Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`.", ) + def delete_by_filter(self, filters: dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + validate_filters(filters) + + try: + weaviate_filter = convert_filters(filters) + result = self.collection.data.delete_many(where=weaviate_filter) + deleted_count = result.successful + logger.info( + "Deleted {n_docs} documents from collection '{collection}' using filters.", + n_docs=deleted_count, + collection=self.collection.name, + ) + return deleted_count + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from e + except Exception as e: + msg = f"Failed to delete documents by filter in Weaviate: {e!s}" + raise DocumentStoreError(msg) from e + + async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + validate_filters(filters) + + try: + collection = await self.async_collection + weaviate_filter = convert_filters(filters) + result = await collection.data.delete_many(where=weaviate_filter) + deleted_count = result.successful + logger.info( + "Deleted {n_docs} documents from collection '{collection}' using filters.", + n_docs=deleted_count, + collection=collection.name, + ) + return deleted_count + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from e + except Exception as e: + msg = f"Failed to delete documents by filter in Weaviate: {e!s}" + raise DocumentStoreError(msg) from e + + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. These will be merged with existing metadata. + :returns: The number of documents updated. + """ + validate_filters(filters) + + if not isinstance(meta, dict): + msg = "Meta must be a dictionary" + raise ValueError(msg) + + try: + matching_objects = self._query_with_filters(filters) + if not matching_objects: + return 0 + + # Update each object with the new metadata + # Since metadata is stored flattened in Weaviate properties, we update properties directly + updated_count = 0 + failed_updates = [] + + for obj in matching_objects: + try: + # Get current properties + current_properties = obj.properties.copy() if obj.properties else {} + + # Update with new metadata values + # Note: metadata fields are stored directly in properties (flattened) + for key, value in meta.items(): + current_properties[key] = value + + # Update the object, preserving the vector + # Get the vector from the object to preserve it during replace + vector = None + if isinstance(obj.vector, list): + vector = obj.vector + elif isinstance(obj.vector, dict): + vector = obj.vector.get("default") + + self.collection.data.replace( + uuid=obj.uuid, + properties=current_properties, + vector=vector, + ) + updated_count += 1 + except Exception as e: + # Collect failed updates but continue with others + obj_properties = obj.properties or {} + id_ = obj_properties.get("_original_id", obj.uuid) + failed_updates.append((id_, str(e))) + + if failed_updates: + msg = "\n".join( + [f"Failed to update object with id '{id_}'. Error: '{error}'" for id_, error in failed_updates] + ) + raise DocumentStoreError(msg) + + logger.info( + "Updated {n_docs} documents in collection '{collection}' using filters.", + n_docs=updated_count, + collection=self.collection.name, + ) + return updated_count + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from e + except Exception as e: + msg = f"Failed to update documents by filter in Weaviate: {e!s}" + raise DocumentStoreError(msg) from e + + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. These will be merged with existing metadata. + :returns: The number of documents updated. + """ + validate_filters(filters) + + if not isinstance(meta, dict): + msg = "Meta must be a dictionary" + raise ValueError(msg) + + try: + collection = await self.async_collection + weaviate_filter = convert_filters(filters) + config = await collection.config.get() + properties = [p.name for p in config.properties] + + # Query all objects matching the filter + matching_objects = [] + offset = 0 + partial_result = None + + # Paginate through all matching objects + # We include vector=True to preserve vectors when updating + while partial_result is None or len(partial_result.objects) == DEFAULT_QUERY_LIMIT: + partial_result = await collection.query.fetch_objects( + filters=weaviate_filter, + include_vector=True, + limit=DEFAULT_QUERY_LIMIT, + offset=offset, + return_properties=properties, + ) + matching_objects.extend(partial_result.objects) + offset += DEFAULT_QUERY_LIMIT + + if not matching_objects: + return 0 + + # Update each object with the new metadata + # Since metadata is stored flattened in Weaviate properties, we update properties directly + updated_count = 0 + failed_updates = [] + + for obj in matching_objects: + try: + # Get current properties + current_properties = obj.properties.copy() if obj.properties else {} + + # Update with new metadata values + # Note: metadata fields are stored directly in properties (flattened) + for key, value in meta.items(): + current_properties[key] = value + + # Update the object, preserving the vector + # Get the vector from the object to preserve it during replace + vector = None + if isinstance(obj.vector, list): + vector = obj.vector + elif isinstance(obj.vector, dict): + vector = obj.vector.get("default") + + await collection.data.replace( + uuid=obj.uuid, + properties=current_properties, + vector=vector, + ) + updated_count += 1 + except Exception as e: + # Collect failed updates but continue with others + obj_properties = obj.properties or {} + id_ = obj_properties.get("_original_id", obj.uuid) + failed_updates.append((id_, str(e))) + + if failed_updates: + msg = "\n".join( + [f"Failed to update object with id '{id_}'. Error: '{error}'" for id_, error in failed_updates] + ) + raise DocumentStoreError(msg) + + logger.info( + "Updated {n_docs} documents in collection '{collection}' using filters.", + n_docs=updated_count, + collection=collection.name, + ) + return updated_count + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from e + except Exception as e: + msg = f"Failed to update documents by filter in Weaviate: {e!s}" + raise DocumentStoreError(msg) from e + def _bm25_retrieval( self, query: str, filters: Optional[dict[str, Any]] = None, top_k: Optional[int] = None ) -> list[Document]: diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index e410877e8..7bcde9c6b 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -58,6 +58,8 @@ def document_store(self, request) -> WeaviateDocumentStore: *DOCUMENT_COLLECTION_PROPERTIES, {"name": "number", "dataType": ["int"]}, {"name": "date", "dataType": ["date"]}, + {"name": "category", "dataType": ["text"]}, + {"name": "status", "dataType": ["text"]}, ], } store = WeaviateDocumentStore( @@ -832,3 +834,72 @@ def test_delete_all_documents_excessive_batch_size(self, document_store, caplog) document_store.delete_all_documents(batch_size=20000) assert document_store.count_documents() == 5 assert "Not all documents have been deleted." in caplog.text + + def test_delete_by_filter(self, document_store): + docs = [ + Document(content="Doc 1", meta={"category": "TypeA"}), + Document(content="Doc 2", meta={"category": "TypeB"}), + Document(content="Doc 3", meta={"category": "TypeA"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Delete documents with category="TypeA" + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "TypeA"} + ) + assert deleted_count == 2 + assert document_store.count_documents() == 1 + + def test_update_by_filter(self, document_store): + docs = [ + Document(content="Doc 1", meta={"category": "TypeA", "status": "draft"}), + Document(content="Doc 2", meta={"category": "TypeB", "status": "draft"}), + Document(content="Doc 3", meta={"category": "TypeA", "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update status for category="TypeA" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "TypeA"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "TypeA" + assert doc.meta["status"] == "published" + + def test_update_by_filter_with_pagination(self, document_store, monkeypatch): + # Reduce DEFAULT_QUERY_LIMIT to test pagination without creating 10000+ documents + monkeypatch.setattr("haystack_integrations.document_stores.weaviate.document_store.DEFAULT_QUERY_LIMIT", 100) + + docs = [] + for index in range(250): + docs.append( + Document(content="This is some content", meta={"index": index, "status": "draft", "category": "test"}) + ) + document_store.write_documents(docs) + + # update all documents should trigger pagination (3 pages) + updated_count = document_store.update_by_filter( + filters={"field": "category", "operator": "==", "value": "test"}, + meta={"status": "published"}, + ) + assert updated_count == 250 + + # verify updates were correct + published_docs = document_store.filter_documents( + filters={"field": "status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 250 + for doc in published_docs: + assert doc.meta["category"] == "test" + assert doc.meta["status"] == "published" + assert "index" in doc.meta + assert 0 <= doc.meta["index"] < 250 diff --git a/integrations/weaviate/tests/test_document_store_async.py b/integrations/weaviate/tests/test_document_store_async.py index f4da93daa..1e4dbe69b 100644 --- a/integrations/weaviate/tests/test_document_store_async.py +++ b/integrations/weaviate/tests/test_document_store_async.py @@ -16,7 +16,11 @@ def document_store(self, request) -> WeaviateDocumentStore: collection_settings = { "class": f"{request.node.name}", "invertedIndexConfig": {"indexNullState": True}, - "properties": DOCUMENT_COLLECTION_PROPERTIES, + "properties": [ + *DOCUMENT_COLLECTION_PROPERTIES, + {"name": "category", "dataType": ["text"]}, + {"name": "status", "dataType": ["text"]}, + ], } store = WeaviateDocumentStore( url="http://localhost:8080", @@ -161,3 +165,81 @@ async def test_hybrid_retrieval_async_with_alpha(self, document_store): ) assert len(result_vector) > 0 assert result_vector[0].score > 0.0 + + @pytest.mark.asyncio + async def test_delete_by_filter_async(self, document_store): + docs = [ + Document(content="Doc 1", meta={"category": "TypeA"}), + Document(content="Doc 2", meta={"category": "TypeB"}), + Document(content="Doc 3", meta={"category": "TypeA"}), + ] + document_store.write_documents(docs) + + # delete documents with category="TypeA" + deleted_count = await document_store.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "TypeA"} + ) + assert deleted_count == 2 + assert document_store.count_documents() == 1 + + # verify only category TypeB remains + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "TypeB" + + @pytest.mark.asyncio + async def test_update_by_filter_async(self, document_store): + docs = [ + Document(content="Doc 1", meta={"category": "TypeA", "status": "draft"}), + Document(content="Doc 2", meta={"category": "TypeB", "status": "draft"}), + Document(content="Doc 3", meta={"category": "TypeA", "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # update status for category="TypeA" documents + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "TypeA"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "TypeA" + assert doc.meta["status"] == "published" + + @pytest.mark.asyncio + async def test_update_by_filter_async_with_pagination(self, document_store, monkeypatch): + # Reduce DEFAULT_QUERY_LIMIT to test pagination without creating 10000+ documents + monkeypatch.setattr("haystack_integrations.document_stores.weaviate.document_store.DEFAULT_QUERY_LIMIT", 100) + + docs = [] + for index in range(250): + docs.append( + Document( + content="This is some content", + meta={"index": index, "status": "draft", "category": "test"}, + ) + ) + document_store.write_documents(docs) + + # update all documents should trigger pagination (3 pages) + updated_count = await document_store.update_by_filter_async( + filters={"field": "category", "operator": "==", "value": "test"}, + meta={"status": "published"}, + ) + assert updated_count == 250 + + published_docs = document_store.filter_documents( + filters={"field": "status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 250 + for doc in published_docs: + assert doc.meta["category"] == "test" + assert doc.meta["status"] == "published" + assert "index" in doc.meta + assert 0 <= doc.meta["index"] < 250