Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
from weaviate.collections.classes.filters import Filter, FilterReturn


def validate_filters(filters: dict[str, Any] | None) -> None:
"""
Validates that filters have the correct structure.

:param filters: The filters to validate.
:raises ValueError: If filters are provided but have invalid syntax.
"""
if filters and "operator" not in filters and "conditions" not in filters:
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
raise ValueError(msg)


def convert_filters(filters: dict[str, Any]) -> FilterReturn:
"""
Convert filters from Haystack format to Weaviate format.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import base64
import datetime
import json
Expand All @@ -19,7 +20,7 @@
from weaviate.embedded import EmbeddedOptions
from weaviate.util import generate_uuid5

from ._filters import convert_filters
from ._filters import convert_filters, validate_filters
from .auth import AuthCredentials

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -419,7 +420,7 @@ def _query_with_filters(self, filters: dict[str, Any]) -> list[DataObject[dict[s
#
# Nonetheless there's also another issue, paginating with limit and offset is not efficient
# and it's still restricted by the QUERY_MAXIMUM_RESULTS environment variable.
# If the sum of limit and offest is greater than QUERY_MAXIMUM_RESULTS an error is raised.
# If the sum of limit and offset is greater than QUERY_MAXIMUM_RESULTS an error is raised.
# See the official docs for more:
# https://weaviate.io/developers/weaviate/api/graphql/additional-operators#performance-considerations
offset = 0
Expand Down Expand Up @@ -452,9 +453,7 @@ def filter_documents(self, filters: Optional[dict[str, Any]] = None) -> list[Doc
:param filters: The filters to apply to the document list.
:returns: A list of Documents that match the given filters.
"""
if filters and "operator" not in filters and "conditions" not in filters:
msg = "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
raise ValueError(msg)
validate_filters(filters)

result = []
if filters:
Expand Down Expand Up @@ -483,7 +482,7 @@ def _batch_write(self, documents: list[Document]) -> int:
vector=doc.embedding,
)
if failed_objects := self.client.batch.failed_objects:
# We fallback to use the UUID if the _original_id is not present, this is just to be
# We fall back to use the UUID if the _original_id is not present, this is just to be
mapped_objects = {}
for obj in failed_objects:
properties = obj.object_.properties or {}
Expand All @@ -507,7 +506,7 @@ def _batch_write(self, documents: list[Document]) -> int:
def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int:
"""
Writes documents to Weaviate using the specified policy.
This doesn't uses the batch API, so it's slower than _batch_write.
This doesn't use the batch API, so it's slower than _batch_write.
If policy is set to SKIP it will skip any document that already exists.
If policy is set to FAIL it will raise an exception if any of the documents already exists.
"""
Expand Down Expand Up @@ -610,6 +609,231 @@ def delete_all_documents(self, *, recreate_index: bool = False, batch_size: int
"Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`.",
)

def delete_by_filter(self, filters: dict[str, Any]) -> int:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
validate_filters(filters)

try:
weaviate_filter = convert_filters(filters)
result = self.collection.data.delete_many(where=weaviate_filter)
deleted_count = result.successful
logger.info(
"Deleted {n_docs} documents from collection '{collection}' using filters.",
n_docs=deleted_count,
collection=self.collection.name,
)
return deleted_count
except weaviate.exceptions.WeaviateQueryError as e:
msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}"
raise DocumentStoreError(msg) from e
except Exception as e:
msg = f"Failed to delete documents by filter in Weaviate: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
validate_filters(filters)

try:
collection = await self.async_collection
weaviate_filter = convert_filters(filters)
result = await collection.data.delete_many(where=weaviate_filter)
deleted_count = result.successful
logger.info(
"Deleted {n_docs} documents from collection '{collection}' using filters.",
n_docs=deleted_count,
collection=collection.name,
)
return deleted_count
except weaviate.exceptions.WeaviateQueryError as e:
msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}"
raise DocumentStoreError(msg) from e
except Exception as e:
msg = f"Failed to delete documents by filter in Weaviate: {e!s}"
raise DocumentStoreError(msg) from e

def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. These will be merged with existing metadata.
:returns: The number of documents updated.
"""
validate_filters(filters)

if not isinstance(meta, dict):
msg = "Meta must be a dictionary"
raise ValueError(msg)

try:
matching_objects = self._query_with_filters(filters)
if not matching_objects:
return 0

# Update each object with the new metadata
# Since metadata is stored flattened in Weaviate properties, we update properties directly
updated_count = 0
failed_updates = []

for obj in matching_objects:
try:
# Get current properties
current_properties = obj.properties.copy() if obj.properties else {}

# Update with new metadata values
# Note: metadata fields are stored directly in properties (flattened)
for key, value in meta.items():
current_properties[key] = value

# Update the object, preserving the vector
# Get the vector from the object to preserve it during replace
vector = None
if isinstance(obj.vector, list):
vector = obj.vector
elif isinstance(obj.vector, dict):
vector = obj.vector.get("default")

self.collection.data.replace(
uuid=obj.uuid,
properties=current_properties,
vector=vector,
)
updated_count += 1
except Exception as e:
# Collect failed updates but continue with others
obj_properties = obj.properties or {}
id_ = obj_properties.get("_original_id", obj.uuid)
failed_updates.append((id_, str(e)))

if failed_updates:
msg = "\n".join(
[f"Failed to update object with id '{id_}'. Error: '{error}'" for id_, error in failed_updates]
)
raise DocumentStoreError(msg)

logger.info(
"Updated {n_docs} documents in collection '{collection}' using filters.",
n_docs=updated_count,
collection=self.collection.name,
)
return updated_count
except weaviate.exceptions.WeaviateQueryError as e:
msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}"
raise DocumentStoreError(msg) from e
except Exception as e:
msg = f"Failed to update documents by filter in Weaviate: {e!s}"
raise DocumentStoreError(msg) from e

async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. These will be merged with existing metadata.
:returns: The number of documents updated.
"""
validate_filters(filters)

if not isinstance(meta, dict):
msg = "Meta must be a dictionary"
raise ValueError(msg)

try:
collection = await self.async_collection
weaviate_filter = convert_filters(filters)
config = await collection.config.get()
properties = [p.name for p in config.properties]

# Query all objects matching the filter
matching_objects = []
offset = 0
partial_result = None

# Paginate through all matching objects
# We include vector=True to preserve vectors when updating
while partial_result is None or len(partial_result.objects) == DEFAULT_QUERY_LIMIT:
partial_result = await collection.query.fetch_objects(
filters=weaviate_filter,
include_vector=True,
limit=DEFAULT_QUERY_LIMIT,
offset=offset,
return_properties=properties,
)
matching_objects.extend(partial_result.objects)
offset += DEFAULT_QUERY_LIMIT

if not matching_objects:
return 0

# Update each object with the new metadata
# Since metadata is stored flattened in Weaviate properties, we update properties directly
updated_count = 0
failed_updates = []

for obj in matching_objects:
try:
# Get current properties
current_properties = obj.properties.copy() if obj.properties else {}

# Update with new metadata values
# Note: metadata fields are stored directly in properties (flattened)
for key, value in meta.items():
current_properties[key] = value

# Update the object, preserving the vector
# Get the vector from the object to preserve it during replace
vector = None
if isinstance(obj.vector, list):
vector = obj.vector
elif isinstance(obj.vector, dict):
vector = obj.vector.get("default")

await collection.data.replace(
uuid=obj.uuid,
properties=current_properties,
vector=vector,
)
updated_count += 1
except Exception as e:
# Collect failed updates but continue with others
obj_properties = obj.properties or {}
id_ = obj_properties.get("_original_id", obj.uuid)
failed_updates.append((id_, str(e)))

if failed_updates:
msg = "\n".join(
[f"Failed to update object with id '{id_}'. Error: '{error}'" for id_, error in failed_updates]
)
raise DocumentStoreError(msg)

logger.info(
"Updated {n_docs} documents in collection '{collection}' using filters.",
n_docs=updated_count,
collection=collection.name,
)
return updated_count
except weaviate.exceptions.WeaviateQueryError as e:
msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}"
raise DocumentStoreError(msg) from e
except Exception as e:
msg = f"Failed to update documents by filter in Weaviate: {e!s}"
raise DocumentStoreError(msg) from e

def _bm25_retrieval(
self, query: str, filters: Optional[dict[str, Any]] = None, top_k: Optional[int] = None
) -> list[Document]:
Expand Down
71 changes: 71 additions & 0 deletions integrations/weaviate/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def document_store(self, request) -> WeaviateDocumentStore:
*DOCUMENT_COLLECTION_PROPERTIES,
{"name": "number", "dataType": ["int"]},
{"name": "date", "dataType": ["date"]},
{"name": "category", "dataType": ["text"]},
{"name": "status", "dataType": ["text"]},
],
}
store = WeaviateDocumentStore(
Expand Down Expand Up @@ -832,3 +834,72 @@ def test_delete_all_documents_excessive_batch_size(self, document_store, caplog)
document_store.delete_all_documents(batch_size=20000)
assert document_store.count_documents() == 5
assert "Not all documents have been deleted." in caplog.text

def test_delete_by_filter(self, document_store):
docs = [
Document(content="Doc 1", meta={"category": "TypeA"}),
Document(content="Doc 2", meta={"category": "TypeB"}),
Document(content="Doc 3", meta={"category": "TypeA"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Delete documents with category="TypeA"
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "TypeA"}
)
assert deleted_count == 2
assert document_store.count_documents() == 1

def test_update_by_filter(self, document_store):
docs = [
Document(content="Doc 1", meta={"category": "TypeA", "status": "draft"}),
Document(content="Doc 2", meta={"category": "TypeB", "status": "draft"}),
Document(content="Doc 3", meta={"category": "TypeA", "status": "draft"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Update status for category="TypeA" documents
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "TypeA"}, meta={"status": "published"}
)
assert updated_count == 2

# Verify the updates
published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "TypeA"
assert doc.meta["status"] == "published"

def test_update_by_filter_with_pagination(self, document_store, monkeypatch):
# Reduce DEFAULT_QUERY_LIMIT to test pagination without creating 10000+ documents
monkeypatch.setattr("haystack_integrations.document_stores.weaviate.document_store.DEFAULT_QUERY_LIMIT", 100)

docs = []
for index in range(250):
docs.append(
Document(content="This is some content", meta={"index": index, "status": "draft", "category": "test"})
)
document_store.write_documents(docs)

# update all documents should trigger pagination (3 pages)
updated_count = document_store.update_by_filter(
filters={"field": "category", "operator": "==", "value": "test"},
meta={"status": "published"},
)
assert updated_count == 250

# verify updates were correct
published_docs = document_store.filter_documents(
filters={"field": "status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 250
for doc in published_docs:
assert doc.meta["category"] == "test"
assert doc.meta["status"] == "published"
assert "index" in doc.meta
assert 0 <= doc.meta["index"] < 250
Loading