Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
from collections.abc import AsyncGenerator, Generator
from itertools import islice
from typing import Any, ClassVar, Optional, Union
from typing import Any, ClassVar, Optional, Union, cast

import qdrant_client
from haystack import default_from_dict, default_to_dict, logging
Expand Down Expand Up @@ -517,6 +517,233 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
"Called QdrantDocumentStore.delete_documents_async() on a non-existing ID",
)

def delete_by_filter(self, filters: dict[str, Any]) -> None:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)

:returns:
The number of documents deleted.
"""
self._initialize_client()
assert self._client is not None

try:
qdrant_filter = convert_filters_to_qdrant(filters)
if qdrant_filter is None:
return

# perform deletion using FilterSelector
self._client.delete(
collection_name=self.index,
points_selector=rest.FilterSelector(filter=qdrant_filter),
wait=self.wait_result_from_api,
)

except Exception as e:
msg = f"Failed to delete documents by filter from Qdrant: {e!s}"
raise QdrantStoreError(msg) from e

async def delete_by_filter_async(self, filters: dict[str, Any]) -> None:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)

:returns:
The number of documents deleted.
"""
await self._initialize_async_client()
assert self._async_client is not None

try:
qdrant_filter = convert_filters_to_qdrant(filters)
if qdrant_filter is None:
return

# perform deletion using FilterSelector
await self._async_client.delete(
collection_name=self.index,
points_selector=rest.FilterSelector(filter=qdrant_filter),
wait=self.wait_result_from_api,
)

except Exception as e:
msg = f"Failed to delete documents by filter from Qdrant: {e!s}"
raise QdrantStoreError(msg) from e

@staticmethod
def _check_stop_scrolling(next_offset: Any) -> bool:
"""
Checks if scrolling should stop based on the next_offset value.

:param next_offset: The offset returned from the scroll operation.
:returns: True if scrolling should stop, False otherwise.
"""
return next_offset is None or (
hasattr(next_offset, "num")
and hasattr(next_offset, "uuid")
and next_offset.num == 0
and next_offset.uuid == ""
)

@staticmethod
def _create_updated_point_from_record(record: Any, meta: dict[str, Any]) -> rest.PointStruct:
"""
Creates an updated PointStruct from a Qdrant record with merged metadata.

:param record: The Qdrant record to update.
:param meta: The metadata fields to merge with existing metadata.
:returns: A PointStruct with updated metadata and preserved vectors.
"""
# merge existing payload with new metadata
# Metadata is stored under the "meta" key in the payload
updated_payload = dict(record.payload or {})
if "meta" not in updated_payload:
updated_payload["meta"] = {}
updated_payload["meta"].update(meta)

# create updated point preserving vectors
# Type cast needed because record.vector type doesn't include all PointStruct vector types
vector_value = record.vector if record.vector is not None else {}
return rest.PointStruct(
id=record.id,
vector=cast(Any, vector_value),
payload=updated_payload,
)

def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Updates the metadata of all documents that match the provided filters.

**Note**: This operation is not atomic. Documents matching the filter are fetched first,
then updated. If documents are modified between the fetch and update operations,
those changes may be lost.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. This will be merged with existing metadata.

:returns:
The number of documents updated.
"""
self._initialize_client()
assert self._client is not None

try:
qdrant_filter = convert_filters_to_qdrant(filters)
if qdrant_filter is None:
return 0

# get all matching documents using scroll
updated_points = []
next_offset = None

while True:
records, next_offset = self._client.scroll(
collection_name=self.index,
scroll_filter=qdrant_filter,
limit=self.scroll_size,
offset=next_offset,
with_payload=True,
with_vectors=True,
)

# update payload for each record
for record in records:
updated_points.append(self._create_updated_point_from_record(record, meta))

if self._check_stop_scrolling(next_offset):
break

if not updated_points:
return 0

# upsert updated points back in batches
for batch in get_batches_from_generator(updated_points, self.write_batch_size):
self._client.upsert(
collection_name=self.index,
points=list(batch),
wait=self.wait_result_from_api,
)

logger.info(
"Updated {n_docs} documents in collection '{name}' using filters.",
n_docs=len(updated_points),
name=self.index,
)
return len(updated_points)
except Exception as e:
msg = f"Failed to update documents by filter in Qdrant: {e!s}"
raise QdrantStoreError(msg) from e

async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

**Note**: This operation is not atomic. Documents matching the filter are fetched first,
then updated. If documents are modified between the fetch and update operations,
those changes may be lost.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. This will be merged with existing metadata.

:returns:
The number of documents updated.
"""
await self._initialize_async_client()
assert self._async_client is not None

try:
qdrant_filter = convert_filters_to_qdrant(filters)
if qdrant_filter is None:
return 0

updated_points = []
next_offset = None

while True:
records, next_offset = await self._async_client.scroll(
collection_name=self.index,
scroll_filter=qdrant_filter,
limit=self.scroll_size,
offset=next_offset,
with_payload=True,
with_vectors=True,
)

# update payload for each record
for record in records:
updated_points.append(self._create_updated_point_from_record(record, meta))

if self._check_stop_scrolling(next_offset):
break

if not updated_points:
return 0

# upsert updated points back in batches
for batch in get_batches_from_generator(updated_points, self.write_batch_size):
await self._async_client.upsert(
collection_name=self.index,
points=list(batch),
wait=self.wait_result_from_api,
)

logger.info(
"Updated {n_docs} documents in collection '{name}' using filters.",
n_docs=len(updated_points),
name=self.index,
)
return len(updated_points)
except Exception as e:
msg = f"Failed to update documents by filter in Qdrant: {e!s}"
raise QdrantStoreError(msg) from e

def delete_all_documents(self, recreate_index: bool = False) -> None:
"""
Deletes all documents from the document store.
Expand Down
Loading