Skip to content

Commit feb4888

Browse files
feat: adding delete_by_filter() and update_by_filter() to ChromaDocumentStore (#2649)
* adding new operations + tests * filtering metadata to contain only primitive types * fixing typing issues * formatting * removing type ignore and casting to specific ChromaDb types * removing unnecessary convert_filters * refactoring to reduce duplicated code
1 parent 54b166f commit feb4888

4 files changed

Lines changed: 459 additions & 15 deletions

File tree

integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py

Lines changed: 240 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import chromadb
99
from chromadb.api.models.AsyncCollection import AsyncCollection
10-
from chromadb.api.types import GetResult, QueryResult
10+
from chromadb.api.types import GetResult, Metadata, OneOrMany, QueryResult
1111
from haystack import default_from_dict, default_to_dict, logging
1212
from haystack.dataclasses import Document
1313
from haystack.document_stores.errors import DocumentStoreError
@@ -178,7 +178,8 @@ async def _ensure_initialized_async(self):
178178
embedding_function=self._embedding_func,
179179
)
180180

181-
def _prepare_get_kwargs(self, filters: Optional[dict[str, Any]] = None) -> dict[str, Any]:
181+
@staticmethod
182+
def _prepare_get_kwargs(filters: Optional[dict[str, Any]] = None) -> dict[str, Any]:
182183
"""
183184
Prepare kwargs for Chroma get operations.
184185
"""
@@ -195,7 +196,8 @@ def _prepare_get_kwargs(self, filters: Optional[dict[str, Any]] = None) -> dict[
195196

196197
return kwargs
197198

198-
def _prepare_query_kwargs(self, filters: Optional[dict[str, Any]] = None) -> dict[str, Any]:
199+
@staticmethod
200+
def _prepare_query_kwargs(filters: Optional[dict[str, Any]] = None) -> dict[str, Any]:
199201
"""
200202
Prepare kwargs for Chroma query operations.
201203
"""
@@ -246,7 +248,7 @@ def filter_documents(self, filters: Optional[dict[str, Any]] = None) -> list[Doc
246248
self._ensure_initialized()
247249
assert self._collection is not None
248250

249-
kwargs = self._prepare_get_kwargs(filters)
251+
kwargs = ChromaDocumentStore._prepare_get_kwargs(filters)
250252
result = self._collection.get(**kwargs)
251253

252254
return self._get_result_to_documents(result)
@@ -266,12 +268,63 @@ async def filter_documents_async(self, filters: Optional[dict[str, Any]] = None)
266268
await self._ensure_initialized_async()
267269
assert self._async_collection is not None
268270

269-
kwargs = self._prepare_get_kwargs(filters)
271+
kwargs = ChromaDocumentStore._prepare_get_kwargs(filters)
270272
result = await self._async_collection.get(**kwargs)
271273

272274
return self._get_result_to_documents(result)
273275

274-
def _convert_document_to_chroma(self, doc: Document) -> Optional[dict[str, Any]]:
276+
@staticmethod
277+
def _filter_metadata(meta: dict[str, Any]) -> dict[str, str | int | float | bool | None]:
278+
"""
279+
Filters metadata to only include supported types for Chroma.
280+
281+
returns:
282+
A new dictionary with only valid metadata values.
283+
"""
284+
valid_meta: dict[str, str | int | float | bool | None] = {}
285+
discarded_keys = []
286+
287+
for k, v in meta.items():
288+
if v is None or isinstance(v, SUPPORTED_TYPES_FOR_METADATA_VALUES):
289+
valid_meta[k] = v
290+
else:
291+
discarded_keys.append(k)
292+
293+
if discarded_keys:
294+
logger.warning(
295+
"Metadata contains values of unsupported types for the keys: {keys}. "
296+
"These items will be discarded. Supported types are: {types}.",
297+
keys=", ".join(discarded_keys),
298+
types=", ".join([t.__name__ for t in SUPPORTED_TYPES_FOR_METADATA_VALUES]),
299+
)
300+
301+
return valid_meta
302+
303+
@staticmethod
304+
def _prepare_metadata_update(
305+
matching_docs: list[Document], meta: dict[str, Any]
306+
) -> tuple[list[str], list[Metadata]]:
307+
"""
308+
Prepares document IDs and updated metadata for batch update operations.
309+
310+
:param matching_docs: List of documents to update.
311+
:param meta: New metadata to merge with existing document metadata.
312+
:returns: Tuple of (ids_to_update, updated_metadata).
313+
"""
314+
ids_to_update = []
315+
updated_metadata: list[Metadata] = []
316+
317+
for doc in matching_docs:
318+
ids_to_update.append(doc.id)
319+
current_meta = doc.meta or {}
320+
updated_meta = {**current_meta, **meta}
321+
filtered_meta = ChromaDocumentStore._filter_metadata(updated_meta)
322+
updated_metadata.append(cast(Metadata, filtered_meta))
323+
324+
return ids_to_update, updated_metadata
325+
326+
@staticmethod
327+
def _convert_document_to_chroma(doc: Document) -> Optional[dict[str, Any]]:
275328
"""
276329
Converts a Haystack Document to a Chroma document.
277330
"""
@@ -353,7 +406,7 @@ def write_documents(
353406
assert self._collection is not None
354407

355408
for doc in documents:
356-
data = self._convert_document_to_chroma(doc)
409+
data = ChromaDocumentStore._convert_document_to_chroma(doc)
357410
if data is not None:
358411
self._collection.add(**data)
359412

@@ -384,7 +437,7 @@ async def write_documents_async(
384437
assert self._async_collection is not None
385438

386439
for doc in documents:
387-
data = self._convert_document_to_chroma(doc)
440+
data = ChromaDocumentStore._convert_document_to_chroma(doc)
388441
if data is not None:
389442
await self._async_collection.add(**data)
390443

@@ -414,6 +467,181 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
414467

415468
await self._async_collection.delete(ids=document_ids)
416469

470+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
471+
"""
472+
Deletes all documents that match the provided filters.
473+
474+
:param filters: The filters to apply to select documents for deletion.
475+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering)
476+
:returns: The number of documents deleted.
477+
"""
478+
self._ensure_initialized()
479+
assert self._collection is not None
480+
481+
try:
482+
chroma_filter = _convert_filters(filters)
483+
484+
# count documents before deletion since ChromaDB doesn't return count
485+
matching_docs = self.filter_documents(filters)
486+
count = len(matching_docs)
487+
488+
if count == 0:
489+
return 0
490+
491+
delete_kwargs: dict[str, Any] = {}
492+
493+
if chroma_filter.ids:
494+
# if the filter contains IDs, use them directly
495+
delete_kwargs["ids"] = chroma_filter.ids
496+
else:
497+
# use where/where_document filters
498+
if chroma_filter.where:
499+
delete_kwargs["where"] = chroma_filter.where
500+
if chroma_filter.where_document:
501+
delete_kwargs["where_document"] = chroma_filter.where_document
502+
503+
# perform deletion
504+
self._collection.delete(**delete_kwargs)
505+
506+
logger.info(
507+
"Deleted {n_docs} documents from collection '{name}' using filters.",
508+
n_docs=count,
509+
name=self._collection_name,
510+
)
511+
return count
512+
except Exception as e:
513+
msg = f"Failed to delete documents by filter from ChromaDB: {e!s}"
514+
raise DocumentStoreError(msg) from e
515+
516+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
517+
"""
518+
Asynchronously deletes all documents that match the provided filters.
519+
520+
Asynchronous methods are only supported for HTTP connections.
521+
522+
:param filters: The filters to apply to select documents for deletion.
523+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering)
524+
:returns: The number of documents deleted.
525+
"""
526+
await self._ensure_initialized_async()
527+
assert self._async_collection is not None
528+
529+
try:
530+
chroma_filter = _convert_filters(filters)
531+
532+
# count documents before deletion since ChromaDB doesn't return count
533+
matching_docs = await self.filter_documents_async(filters)
534+
count = len(matching_docs)
535+
536+
if count == 0:
537+
return 0
538+
539+
delete_kwargs: dict[str, Any] = {}
540+
541+
if chroma_filter.ids:
542+
# if filter contains IDs, use them directly
543+
delete_kwargs["ids"] = chroma_filter.ids
544+
else:
545+
# use where/where_document filters
546+
if chroma_filter.where:
547+
delete_kwargs["where"] = chroma_filter.where
548+
if chroma_filter.where_document:
549+
delete_kwargs["where_document"] = chroma_filter.where_document
550+
551+
await self._async_collection.delete(**delete_kwargs)
552+
553+
logger.info(
554+
"Deleted {n_docs} documents from collection '{name}' using filters.",
555+
n_docs=count,
556+
name=self._collection_name,
557+
)
558+
return count
559+
except Exception as e:
560+
msg = f"Failed to delete documents by filter from ChromaDB: {e!s}"
561+
raise DocumentStoreError(msg) from e
562+
563+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
564+
"""
565+
Updates the metadata of all documents that match the provided filters.
566+
567+
**Note**: This operation is not atomic. Documents matching the filter are fetched first,
568+
then updated. If documents are modified between the fetch and update operations,
569+
those changes may be lost.
570+
571+
:param filters: The filters to apply to select documents for updating.
572+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering)
573+
:param meta: The metadata fields to update. This will be merged with existing metadata.
574+
:returns: The number of documents updated.
575+
"""
576+
self._ensure_initialized()
577+
assert self._collection is not None
578+
579+
try:
580+
matching_docs = self.filter_documents(filters)
581+
582+
if not matching_docs:
583+
return 0
584+
585+
ids_to_update, updated_metadata = ChromaDocumentStore._prepare_metadata_update(matching_docs, meta)
586+
587+
# batch update
588+
self._collection.update(
589+
ids=ids_to_update,
590+
metadatas=cast(OneOrMany[Metadata], updated_metadata),
591+
)
592+
593+
logger.info(
594+
"Updated {n_docs} documents in collection '{name}' using filters.",
595+
n_docs=len(ids_to_update),
596+
name=self._collection_name,
597+
)
598+
return len(ids_to_update)
599+
except Exception as e:
600+
msg = f"Failed to update documents by filter in ChromaDB: {e!s}"
601+
raise DocumentStoreError(msg) from e
602+
603+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
604+
"""
605+
Asynchronously updates the metadata of all documents that match the provided filters.
606+
607+
Asynchronous methods are only supported for HTTP connections.
608+
609+
**Note**: This operation is not atomic. Documents matching the filter are fetched first,
610+
then updated. If documents are modified between the fetch and update operations,
611+
those changes may be lost.
612+
613+
:param filters: The filters to apply to select documents for updating.
614+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering)
615+
:param meta: The metadata fields to update. This will be merged with existing metadata.
616+
:returns: The number of documents updated.
617+
"""
618+
await self._ensure_initialized_async()
619+
assert self._async_collection is not None
620+
621+
try:
622+
matching_docs = await self.filter_documents_async(filters)
623+
624+
if not matching_docs:
625+
return 0
626+
627+
ids_to_update, updated_metadata = ChromaDocumentStore._prepare_metadata_update(matching_docs, meta)
628+
629+
# batch update
630+
await self._async_collection.update(
631+
ids=ids_to_update,
632+
metadatas=cast(OneOrMany[Metadata], updated_metadata),
633+
)
634+
635+
logger.info(
636+
"Updated {n_docs} documents in collection '{name}' using filters.",
637+
n_docs=len(ids_to_update),
638+
name=self._collection_name,
639+
)
640+
return len(ids_to_update)
641+
except Exception as e:
642+
msg = f"Failed to update documents by filter in ChromaDB: {e!s}"
643+
raise DocumentStoreError(msg) from e
644+
417645
def delete_all_documents(self, *, recreate_index: bool = False) -> None:
418646
"""
419647
Deletes all documents in the document store.
@@ -511,7 +739,7 @@ def search(
511739
self._ensure_initialized()
512740
assert self._collection is not None
513741

514-
kwargs = self._prepare_query_kwargs(filters)
742+
kwargs = ChromaDocumentStore._prepare_query_kwargs(filters)
515743
results = self._collection.query(
516744
query_texts=queries,
517745
n_results=top_k,
@@ -539,7 +767,7 @@ async def search_async(
539767
await self._ensure_initialized_async()
540768
assert self._async_collection is not None
541769

542-
kwargs = self._prepare_query_kwargs(filters)
770+
kwargs = ChromaDocumentStore._prepare_query_kwargs(filters)
543771
results = await self._async_collection.query(
544772
query_texts=queries,
545773
n_results=top_k,
@@ -567,7 +795,7 @@ def search_embeddings(
567795
self._ensure_initialized()
568796
assert self._collection is not None
569797

570-
kwargs = self._prepare_query_kwargs(filters)
798+
kwargs = ChromaDocumentStore._prepare_query_kwargs(filters)
571799
results = self._collection.query(
572800
query_embeddings=cast(list[Sequence[float]], query_embeddings),
573801
n_results=top_k,
@@ -598,7 +826,7 @@ async def search_embeddings_async(
598826
await self._ensure_initialized_async()
599827
assert self._async_collection is not None
600828

601-
kwargs = self._prepare_query_kwargs(filters)
829+
kwargs = ChromaDocumentStore._prepare_query_kwargs(filters)
602830
results = await self._async_collection.query(
603831
query_embeddings=cast(list[Sequence[float]], query_embeddings),
604832
n_results=top_k,

integrations/chroma/tests/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
import numpy as np
26
import pytest
37
from chromadb.api.types import Documents, EmbeddingFunction, Embeddings

0 commit comments

Comments
 (0)