diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index cd9a4dc43..6460bea43 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -1,7 +1,7 @@ import inspect from collections.abc import AsyncGenerator, Generator from itertools import islice -from typing import Any, ClassVar, Optional, Union +from typing import Any, ClassVar, Optional, Union, cast import qdrant_client from haystack import default_from_dict, default_to_dict, logging @@ -517,6 +517,233 @@ async def delete_documents_async(self, document_ids: list[str]) -> None: "Called QdrantDocumentStore.delete_documents_async() on a non-existing ID", ) + def delete_by_filter(self, filters: dict[str, Any]) -> None: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + + :returns: + The number of documents deleted. + """ + self._initialize_client() + assert self._client is not None + + try: + qdrant_filter = convert_filters_to_qdrant(filters) + if qdrant_filter is None: + return + + # perform deletion using FilterSelector + self._client.delete( + collection_name=self.index, + points_selector=rest.FilterSelector(filter=qdrant_filter), + wait=self.wait_result_from_api, + ) + + except Exception as e: + msg = f"Failed to delete documents by filter from Qdrant: {e!s}" + raise QdrantStoreError(msg) from e + + async def delete_by_filter_async(self, filters: dict[str, Any]) -> None: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + + :returns: + The number of documents deleted. + """ + await self._initialize_async_client() + assert self._async_client is not None + + try: + qdrant_filter = convert_filters_to_qdrant(filters) + if qdrant_filter is None: + return + + # perform deletion using FilterSelector + await self._async_client.delete( + collection_name=self.index, + points_selector=rest.FilterSelector(filter=qdrant_filter), + wait=self.wait_result_from_api, + ) + + except Exception as e: + msg = f"Failed to delete documents by filter from Qdrant: {e!s}" + raise QdrantStoreError(msg) from e + + @staticmethod + def _check_stop_scrolling(next_offset: Any) -> bool: + """ + Checks if scrolling should stop based on the next_offset value. + + :param next_offset: The offset returned from the scroll operation. + :returns: True if scrolling should stop, False otherwise. + """ + return next_offset is None or ( + hasattr(next_offset, "num") + and hasattr(next_offset, "uuid") + and next_offset.num == 0 + and next_offset.uuid == "" + ) + + @staticmethod + def _create_updated_point_from_record(record: Any, meta: dict[str, Any]) -> rest.PointStruct: + """ + Creates an updated PointStruct from a Qdrant record with merged metadata. + + :param record: The Qdrant record to update. + :param meta: The metadata fields to merge with existing metadata. + :returns: A PointStruct with updated metadata and preserved vectors. + """ + # merge existing payload with new metadata + # Metadata is stored under the "meta" key in the payload + updated_payload = dict(record.payload or {}) + if "meta" not in updated_payload: + updated_payload["meta"] = {} + updated_payload["meta"].update(meta) + + # create updated point preserving vectors + # Type cast needed because record.vector type doesn't include all PointStruct vector types + vector_value = record.vector if record.vector is not None else {} + return rest.PointStruct( + id=record.id, + vector=cast(Any, vector_value), + payload=updated_payload, + ) + + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + **Note**: This operation is not atomic. Documents matching the filter are fetched first, + then updated. If documents are modified between the fetch and update operations, + those changes may be lost. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. This will be merged with existing metadata. + + :returns: + The number of documents updated. + """ + self._initialize_client() + assert self._client is not None + + try: + qdrant_filter = convert_filters_to_qdrant(filters) + if qdrant_filter is None: + return 0 + + # get all matching documents using scroll + updated_points = [] + next_offset = None + + while True: + records, next_offset = self._client.scroll( + collection_name=self.index, + scroll_filter=qdrant_filter, + limit=self.scroll_size, + offset=next_offset, + with_payload=True, + with_vectors=True, + ) + + # update payload for each record + for record in records: + updated_points.append(self._create_updated_point_from_record(record, meta)) + + if self._check_stop_scrolling(next_offset): + break + + if not updated_points: + return 0 + + # upsert updated points back in batches + for batch in get_batches_from_generator(updated_points, self.write_batch_size): + self._client.upsert( + collection_name=self.index, + points=list(batch), + wait=self.wait_result_from_api, + ) + + logger.info( + "Updated {n_docs} documents in collection '{name}' using filters.", + n_docs=len(updated_points), + name=self.index, + ) + return len(updated_points) + except Exception as e: + msg = f"Failed to update documents by filter in Qdrant: {e!s}" + raise QdrantStoreError(msg) from e + + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + **Note**: This operation is not atomic. Documents matching the filter are fetched first, + then updated. If documents are modified between the fetch and update operations, + those changes may be lost. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. This will be merged with existing metadata. + + :returns: + The number of documents updated. + """ + await self._initialize_async_client() + assert self._async_client is not None + + try: + qdrant_filter = convert_filters_to_qdrant(filters) + if qdrant_filter is None: + return 0 + + updated_points = [] + next_offset = None + + while True: + records, next_offset = await self._async_client.scroll( + collection_name=self.index, + scroll_filter=qdrant_filter, + limit=self.scroll_size, + offset=next_offset, + with_payload=True, + with_vectors=True, + ) + + # update payload for each record + for record in records: + updated_points.append(self._create_updated_point_from_record(record, meta)) + + if self._check_stop_scrolling(next_offset): + break + + if not updated_points: + return 0 + + # upsert updated points back in batches + for batch in get_batches_from_generator(updated_points, self.write_batch_size): + await self._async_client.upsert( + collection_name=self.index, + points=list(batch), + wait=self.wait_result_from_api, + ) + + logger.info( + "Updated {n_docs} documents in collection '{name}' using filters.", + n_docs=len(updated_points), + name=self.index, + ) + return len(updated_points) + except Exception as e: + msg = f"Failed to update documents by filter in Qdrant: {e!s}" + raise QdrantStoreError(msg) from e + def delete_all_documents(self, recreate_index: bool = False) -> None: """ Deletes all documents from the document store. diff --git a/integrations/qdrant/tests/test_document_store.py b/integrations/qdrant/tests/test_document_store.py index a95365fb4..347c68f8a 100644 --- a/integrations/qdrant/tests/test_document_store.py +++ b/integrations/qdrant/tests/test_document_store.py @@ -338,3 +338,192 @@ def test_delete_all_documents_index_recreation(self, document_store): # ensure the collection still exists by writing documents again document_store.write_documents(docs) assert document_store.count_documents() == 5 + + def test_delete_by_filter(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023}), + Document(content="Doc 2", meta={"category": "B", "year": 2023}), + Document(content="Doc 3", meta={"category": "A", "year": 2024}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + document_store.delete_by_filter(filters={"field": "meta.category", "operator": "==", "value": "A"}) + + # Verify only category B remains + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + # Delete remaining document by year + document_store.delete_by_filter(filters={"field": "meta.year", "operator": "==", "value": 2023}) + assert document_store.count_documents() == 0 + + def test_delete_by_filter_no_matches(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + # try to delete documents with category="C" (no matches) + document_store.delete_by_filter(filters={"field": "meta.category", "operator": "==", "value": "C"}) + assert document_store.count_documents() == 2 + + def test_delete_by_filter_advanced_filters(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023, "status": "draft"}), + Document(content="Doc 2", meta={"category": "A", "year": 2024, "status": "published"}), + Document(content="Doc 3", meta={"category": "B", "year": 2023, "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # AND condition + document_store.delete_by_filter( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.year", "operator": "==", "value": 2023}, + ], + } + ) + assert document_store.count_documents() == 2 + + # OR condition + document_store.delete_by_filter( + filters={ + "operator": "OR", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "B"}, + {"field": "meta.status", "operator": "==", "value": "published"}, + ], + } + ) + assert document_store.count_documents() == 0 + + def test_update_by_filter(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update status for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updated documents have the new metadata + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["category"] == "A" + + # Verify documents with category="B" were not updated + draft_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B" + + def test_update_by_filter_multiple_fields(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023}), + Document(content="Doc 2", meta={"category": "A", "year": 2023}), + Document(content="Doc 3", meta={"category": "B", "year": 2024}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update multiple fields for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + meta={"status": "published", "reviewed": True}, + ) + assert updated_count == 2 + + # Verify updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["reviewed"] is True + assert doc.meta["category"] == "A" + assert doc.meta["year"] == 2023 # Existing field preserved + + def test_update_by_filter_no_matches(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + # Try to update documents with category="C" (no matches) + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"} + ) + assert updated_count == 0 + assert document_store.count_documents() == 2 + + def test_update_by_filter_advanced_filters(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023, "status": "draft"}), + Document(content="Doc 2", meta={"category": "A", "year": 2024, "status": "draft"}), + Document(content="Doc 3", meta={"category": "B", "year": 2023, "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update with AND condition + updated_count = document_store.update_by_filter( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.year", "operator": "==", "value": 2023}, + ], + }, + meta={"status": "published"}, + ) + assert updated_count == 1 + + # Verify only one document was updated + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 1 + assert published_docs[0].meta["category"] == "A" + assert published_docs[0].meta["year"] == 2023 + + def test_update_by_filter_preserves_vectors(self, document_store: QdrantDocumentStore): + """Test that update_by_filter preserves document embeddings.""" + docs = [ + Document(content="Doc 1", meta={"category": "A"}, embedding=[0.1] * 768), + Document(content="Doc 2", meta={"category": "B"}, embedding=[0.2] * 768), + ] + document_store.write_documents(docs) + + # Update metadata + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 1 + + # Verify embedding is preserved + updated_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(updated_docs) == 1 + assert updated_docs[0].embedding is not None + assert len(updated_docs[0].embedding) == 768 diff --git a/integrations/qdrant/tests/test_document_store_async.py b/integrations/qdrant/tests/test_document_store_async.py index 8235a6623..ba4a5d7a5 100644 --- a/integrations/qdrant/tests/test_document_store_async.py +++ b/integrations/qdrant/tests/test_document_store_async.py @@ -262,3 +262,215 @@ async def test_delete_all_documents_async_index_recreation(self, document_store) # ensure the collection still exists by writing documents again await document_store.write_documents_async(docs) assert await document_store.count_documents_async() == 5 + + @pytest.mark.asyncio + async def test_delete_by_filter_async(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023}), + Document(content="Doc 2", meta={"category": "B", "year": 2023}), + Document(content="Doc 3", meta={"category": "A", "year": 2024}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Delete documents with category="A" + await document_store.delete_by_filter_async(filters={"field": "meta.category", "operator": "==", "value": "A"}) + assert await document_store.count_documents_async() == 1 + + # Verify only category B remains + remaining_docs = [] + async for doc in document_store._get_documents_generator_async(): + remaining_docs.append(doc) + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + # Delete remaining document by year + await document_store.delete_by_filter_async(filters={"field": "meta.year", "operator": "==", "value": 2023}) + assert await document_store.count_documents_async() == 0 + + @pytest.mark.asyncio + async def test_delete_by_filter_async_no_matches(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 2 + + # Try to delete documents with category="C" (no matches) + await document_store.delete_by_filter_async(filters={"field": "meta.category", "operator": "==", "value": "C"}) + assert await document_store.count_documents_async() == 2 + + @pytest.mark.asyncio + async def test_delete_by_filter_async_advanced_filters(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023, "status": "draft"}), + Document(content="Doc 2", meta={"category": "A", "year": 2024, "status": "published"}), + Document(content="Doc 3", meta={"category": "B", "year": 2023, "status": "draft"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # AND condition + await document_store.delete_by_filter_async( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.year", "operator": "==", "value": 2023}, + ], + } + ) + assert await document_store.count_documents_async() == 2 + + # OR condition + await document_store.delete_by_filter_async( + filters={ + "operator": "OR", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "B"}, + {"field": "meta.status", "operator": "==", "value": "published"}, + ], + } + ) + assert await document_store.count_documents_async() == 0 + + @pytest.mark.asyncio + async def test_update_by_filter_async(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update status for category="A" documents + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updated documents have the new metadata + published_docs = [] + async for doc in document_store._get_documents_generator_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ): + published_docs.append(doc) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["category"] == "A" + + # Verify documents with category="B" were not updated + draft_docs = [] + async for doc in document_store._get_documents_generator_async( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ): + draft_docs.append(doc) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B" + + @pytest.mark.asyncio + async def test_update_by_filter_async_multiple_fields(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023}), + Document(content="Doc 2", meta={"category": "A", "year": 2023}), + Document(content="Doc 3", meta={"category": "B", "year": 2024}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update multiple fields for category="A" documents + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, + meta={"status": "published", "reviewed": True}, + ) + assert updated_count == 2 + + # Verify updates + published_docs = [] + async for doc in document_store._get_documents_generator_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ): + published_docs.append(doc) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["reviewed"] is True + assert doc.meta["category"] == "A" + assert doc.meta["year"] == 2023 # Existing field preserved + + @pytest.mark.asyncio + async def test_update_by_filter_async_no_matches(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 2 + + # Try to update documents with category="C" (no matches) + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"} + ) + assert updated_count == 0 + assert await document_store.count_documents_async() == 2 + + @pytest.mark.asyncio + async def test_update_by_filter_async_advanced_filters(self, document_store: QdrantDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "year": 2023, "status": "draft"}), + Document(content="Doc 2", meta={"category": "A", "year": 2024, "status": "draft"}), + Document(content="Doc 3", meta={"category": "B", "year": 2023, "status": "draft"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update with AND condition + updated_count = await document_store.update_by_filter_async( + filters={ + "operator": "AND", + "conditions": [ + {"field": "meta.category", "operator": "==", "value": "A"}, + {"field": "meta.year", "operator": "==", "value": 2023}, + ], + }, + meta={"status": "published"}, + ) + assert updated_count == 1 + + # Verify only one document was updated + published_docs = [] + async for doc in document_store._get_documents_generator_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ): + published_docs.append(doc) + assert len(published_docs) == 1 + assert published_docs[0].meta["category"] == "A" + assert published_docs[0].meta["year"] == 2023 + + @pytest.mark.asyncio + async def test_update_by_filter_async_preserves_vectors(self, document_store: QdrantDocumentStore): + """Test that update_by_filter_async preserves document embeddings.""" + docs = [ + Document(content="Doc 1", meta={"category": "A"}, embedding=[0.1] * 768), + Document(content="Doc 2", meta={"category": "B"}, embedding=[0.2] * 768), + ] + await document_store.write_documents_async(docs) + + # Update metadata + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 1 + + # Verify embedding is preserved + updated_docs = [] + async for doc in document_store._get_documents_generator_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ): + updated_docs.append(doc) + assert len(updated_docs) == 1 + assert updated_docs[0].embedding is not None + assert len(updated_docs[0].embedding) == 768