-
Notifications
You must be signed in to change notification settings - Fork 204
feat: adding delete_by_filter and update_by_filter to PineconeDocumentStore
#2655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
davidsbatista
merged 9 commits into
main
from
feat/add-update-delete-by-filter-to-PineConeDocumentStore
Jan 8, 2026
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
b19e8ed
adding delete_by_filter and updated_by_filter + tests
davidsbatista ab5080b
cleaning up
davidsbatista cc8d6e1
refactoring to reduce duplicated code
davidsbatista ed89ab2
Merge branch 'main' into feat/add-update-delete-by-filter-to-PineCone…
davidsbatista 376c3af
Merge branch 'main' into feat/add-update-delete-by-filter-to-PineCone…
davidsbatista 7dc1619
removing whitespace
davidsbatista 6f299ab
Merge branch 'main' into feat/add-update-delete-by-filter-to-PineCone…
davidsbatista e9ad4d2
removing logging for delete docs count
davidsbatista 55c1512
Merge branch 'main' into feat/add-update-delete-by-filter-to-PineCone…
davidsbatista File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| # SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]> | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from copy import copy | ||
| from typing import Any, Literal, Optional, Union | ||
|
|
||
|
|
@@ -376,6 +377,187 @@ async def delete_all_documents_async(self) -> None: | |
| # Namespace doesn't exist (empty collection), which is fine - nothing to delete | ||
| logger.debug("Namespace '{namespace}' not found. Nothing to delete.", namespace=self.namespace or "default") | ||
|
|
||
| @staticmethod | ||
| def _update_documents_metadata(documents: list[Document], meta: dict[str, Any]) -> None: | ||
| """ | ||
| Updates metadata for a list of documents by merging the provided meta dictionary. | ||
|
|
||
| :param documents: List of documents to update. | ||
| :param meta: Metadata fields to merge into each document's existing metadata. | ||
| """ | ||
| for document in documents: | ||
| if document.meta is None: | ||
| document.meta = {} | ||
| document.meta.update(meta) | ||
|
|
||
| def _log_top_k_limit_warning(self, document_count: int, operation: str) -> None: | ||
| """ | ||
| Logs a warning if the document count equals TOP_K_LIMIT, indicating potential truncation. | ||
|
|
||
| :param document_count: Number of documents processed. | ||
| :param operation: Description of the operation (e.g., "deleted", "updated"). | ||
| """ | ||
| if document_count == TOP_K_LIMIT: | ||
| logger.warning( | ||
| f"PineconeDocumentStore can return at most {TOP_K_LIMIT} documents. " | ||
| f"It is possible that more than {document_count} documents matched the filters, " | ||
| f"but only {document_count} were {operation}." | ||
| ) | ||
|
|
||
| def delete_by_filter(self, filters: dict[str, Any]) -> int: | ||
| """ | ||
| Deletes all documents that match the provided filters. | ||
|
|
||
| Pinecone does not support server-side delete by filter, so this method | ||
| first searches for matching documents, then deletes them by ID. | ||
|
|
||
| :param filters: The filters to apply to select documents for deletion. | ||
| For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) | ||
| :returns: The number of documents deleted. | ||
| """ | ||
| _validate_filters(filters) | ||
|
|
||
| self._initialize_index() | ||
| assert self._index is not None, "Index is not initialized" | ||
|
|
||
| documents = self.filter_documents(filters=filters) | ||
| if not documents: | ||
| return 0 | ||
|
|
||
| document_ids = [doc.id for doc in documents] | ||
|
|
||
| self.delete_documents(document_ids) | ||
|
|
||
| deleted_count = len(document_ids) | ||
| logger.info( | ||
| "Deleted {n_docs} documents from index '{index}' using filters.", | ||
| n_docs=deleted_count, | ||
| index=self.index_name, | ||
| ) | ||
|
|
||
| self._log_top_k_limit_warning(deleted_count, "deleted") | ||
|
|
||
| return deleted_count | ||
|
|
||
| async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: | ||
| """ | ||
| Asynchronously deletes all documents that match the provided filters. | ||
|
|
||
| Pinecone does not support server-side delete by filter, so this method | ||
| first searches for matching documents, then deletes them by ID. | ||
|
|
||
| :param filters: The filters to apply to select documents for deletion. | ||
| For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) | ||
| :returns: The number of documents deleted. | ||
| """ | ||
| _validate_filters(filters) | ||
|
|
||
| await self._initialize_async_index() | ||
| assert self._async_index is not None, "Index is not initialized" | ||
|
|
||
| documents = await self.filter_documents_async(filters=filters) | ||
| if not documents: | ||
| return 0 | ||
|
|
||
| document_ids = [doc.id for doc in documents] | ||
|
|
||
| await self.delete_documents_async(document_ids) | ||
|
|
||
| deleted_count = len(document_ids) | ||
| logger.info( | ||
| "Deleted {n_docs} documents from index '{index}' using filters.", | ||
| n_docs=deleted_count, | ||
| index=self.index_name, | ||
| ) | ||
|
|
||
| self._log_top_k_limit_warning(deleted_count, "deleted") | ||
|
|
||
| return deleted_count | ||
|
|
||
| def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: | ||
| """ | ||
| Updates the metadata of all documents that match the provided filters. | ||
|
|
||
| Pinecone does not support server-side update by filter, so this method | ||
| first searches for matching documents, then updates their metadata and re-writes them. | ||
|
|
||
| :param filters: The filters to apply to select documents for updating. | ||
| For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) | ||
| :param meta: The metadata fields to update. This will be merged with existing metadata. | ||
| :returns: The number of documents updated. | ||
| """ | ||
| _validate_filters(filters) | ||
|
|
||
| if not isinstance(meta, dict): | ||
| msg = "meta must be a dictionary" | ||
| raise ValueError(msg) | ||
|
|
||
| self._initialize_index() | ||
| assert self._index is not None, "Index is not initialized" | ||
|
|
||
| documents = self.filter_documents(filters=filters) | ||
| if not documents: | ||
| return 0 | ||
|
|
||
| self._update_documents_metadata(documents, meta) | ||
|
|
||
| # Re-write documents with updated metadata | ||
| # Using OVERWRITE policy to update existing documents | ||
| self.write_documents(documents, policy=DuplicatePolicy.OVERWRITE) | ||
|
|
||
| updated_count = len(documents) | ||
| logger.info( | ||
| "Updated {n_docs} documents in index '{index}' using filters.", | ||
| n_docs=updated_count, | ||
| index=self.index_name, | ||
| ) | ||
|
|
||
| self._log_top_k_limit_warning(updated_count, "updated") | ||
|
|
||
| return updated_count | ||
|
|
||
| async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: | ||
| """ | ||
| Asynchronously updates the metadata of all documents that match the provided filters. | ||
|
|
||
| Pinecone does not support server-side update by filter, so this method | ||
| first searches for matching documents, then updates their metadata and re-writes them. | ||
|
|
||
| :param filters: The filters to apply to select documents for updating. | ||
| For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) | ||
| :param meta: The metadata fields to update. This will be merged with existing metadata. | ||
| :returns: The number of documents updated. | ||
| """ | ||
| _validate_filters(filters) | ||
|
|
||
| if not isinstance(meta, dict): | ||
| msg = "meta must be a dictionary" | ||
| raise ValueError(msg) | ||
|
|
||
| await self._initialize_async_index() | ||
| assert self._async_index is not None, "Index is not initialized" | ||
|
|
||
| documents = await self.filter_documents_async(filters=filters) | ||
| if not documents: | ||
| return 0 | ||
|
|
||
| self._update_documents_metadata(documents, meta) | ||
|
|
||
| # Re-write documents with updated metadata | ||
| # Using OVERWRITE policy to update existing documents | ||
| await self.write_documents_async(documents, policy=DuplicatePolicy.OVERWRITE) | ||
|
|
||
| updated_count = len(documents) | ||
| logger.info( | ||
| "Updated {n_docs} documents in index '{index}' using filters.", | ||
| n_docs=updated_count, | ||
| index=self.index_name, | ||
| ) | ||
|
|
||
| self._log_top_k_limit_warning(updated_count, "updated") | ||
|
|
||
| return updated_count | ||
|
|
||
| def _embedding_retrieval( | ||
| self, | ||
| query_embedding: list[float], | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,7 @@ | ||
| # SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]> | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import os | ||
| import time | ||
| from unittest.mock import patch | ||
|
|
@@ -342,3 +346,100 @@ def test_sentence_window_retriever(self, document_store: PineconeDocumentStore): | |
| result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]]) | ||
|
|
||
| assert len(result["context_windows"]) == 1 | ||
|
|
||
| def test_delete_by_filter(self, document_store: PineconeDocumentStore): | ||
| docs = [ | ||
| Document(content="Doc 1", meta={"category": "A"}), | ||
| Document(content="Doc 2", meta={"category": "B"}), | ||
| Document(content="Doc 3", meta={"category": "A"}), | ||
| ] | ||
| document_store.write_documents(docs) | ||
|
|
||
| # delete documents with category="A" | ||
| deleted_count = document_store.delete_by_filter( | ||
| filters={"field": "meta.category", "operator": "==", "value": "A"} | ||
| ) | ||
| assert deleted_count == 2 | ||
| assert document_store.count_documents() == 1 | ||
|
|
||
| # verify only category B remains | ||
| remaining_docs = document_store.filter_documents() | ||
| assert len(remaining_docs) == 1 | ||
| assert remaining_docs[0].meta["category"] == "B" | ||
|
|
||
| def test_delete_by_filter_no_matches(self, document_store: PineconeDocumentStore): | ||
| docs = [ | ||
| Document(content="Doc 1", meta={"category": "A"}), | ||
| Document(content="Doc 2", meta={"category": "B"}), | ||
| ] | ||
| document_store.write_documents(docs) | ||
|
|
||
| # try to delete documents with category="C" (no matches) | ||
| deleted_count = document_store.delete_by_filter( | ||
| filters={"field": "meta.category", "operator": "==", "value": "C"} | ||
| ) | ||
| assert deleted_count == 0 | ||
| assert document_store.count_documents() == 2 | ||
|
|
||
| def test_update_by_filter(self, document_store: PineconeDocumentStore): | ||
| docs = [ | ||
| Document(content="Doc 1", meta={"category": "A", "status": "draft"}), | ||
| Document(content="Doc 2", meta={"category": "B", "status": "draft"}), | ||
| Document(content="Doc 3", meta={"category": "A", "status": "draft"}), | ||
| ] | ||
| document_store.write_documents(docs) | ||
|
|
||
| # update status for category="A" documents | ||
| updated_count = document_store.update_by_filter( | ||
| filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} | ||
| ) | ||
| assert updated_count == 2 | ||
|
|
||
| published_docs = document_store.filter_documents( | ||
| filters={"field": "meta.status", "operator": "==", "value": "published"} | ||
| ) | ||
| assert len(published_docs) == 2 | ||
| for doc in published_docs: | ||
| assert doc.meta["category"] == "A" | ||
| assert doc.meta["status"] == "published" | ||
|
|
||
| def test_update_by_filter_multiple_fields(self, document_store: PineconeDocumentStore): | ||
| docs = [ | ||
| Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}), | ||
| Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}), | ||
| Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}), | ||
| ] | ||
| document_store.write_documents(docs) | ||
| assert document_store.count_documents() == 3 | ||
|
|
||
| # Update multiple fields for category="A" documents | ||
| updated_count = document_store.update_by_filter( | ||
| filters={"field": "meta.category", "operator": "==", "value": "A"}, | ||
| meta={"status": "published", "priority": "high"}, | ||
| ) | ||
| assert updated_count == 2 | ||
|
|
||
| # Verify the updates | ||
| published_docs = document_store.filter_documents( | ||
| filters={"field": "meta.status", "operator": "==", "value": "published"} | ||
| ) | ||
| assert len(published_docs) == 2 | ||
| for doc in published_docs: | ||
| assert doc.meta["category"] == "A" | ||
| assert doc.meta["status"] == "published" | ||
| assert doc.meta["priority"] == "high" | ||
|
|
||
| def test_update_by_filter_no_matches(self, document_store: PineconeDocumentStore): | ||
| docs = [ | ||
| Document(content="Doc 1", meta={"category": "A"}), | ||
| Document(content="Doc 2", meta={"category": "B"}), | ||
| ] | ||
| document_store.write_documents(docs) | ||
| assert document_store.count_documents() == 2 | ||
|
|
||
| # Try to update documents with category="C" (no matches) | ||
| updated_count = document_store.update_by_filter( | ||
| filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"} | ||
| ) | ||
| assert updated_count == 0 | ||
| assert document_store.count_documents() == 2 | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems out of place, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this, did as well for other doc stores already.