Skip to content

Commit 830e658

Browse files
feat: adding delete_by_filter and update_by_filter to PineconeDocumentStore (#2655)
* adding delete_by_filter and updated_by_filter + tests * cleaning up * refactoring to reduce duplicated code * removing whitespace * removing logging for delete docs count
1 parent b998f09 commit 830e658

3 files changed

Lines changed: 361 additions & 0 deletions

File tree

integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from copy import copy
56
from typing import Any, Literal, Optional, Union
67

@@ -376,6 +377,165 @@ async def delete_all_documents_async(self) -> None:
376377
# Namespace doesn't exist (empty collection), which is fine - nothing to delete
377378
logger.debug("Namespace '{namespace}' not found. Nothing to delete.", namespace=self.namespace or "default")
378379

380+
@staticmethod
381+
def _update_documents_metadata(documents: list[Document], meta: dict[str, Any]) -> None:
382+
"""
383+
Updates metadata for a list of documents by merging the provided meta dictionary.
384+
385+
:param documents: List of documents to update.
386+
:param meta: Metadata fields to merge into each document's existing metadata.
387+
"""
388+
for document in documents:
389+
if document.meta is None:
390+
document.meta = {}
391+
document.meta.update(meta)
392+
393+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
394+
"""
395+
Deletes all documents that match the provided filters.
396+
397+
Pinecone does not support server-side delete by filter, so this method
398+
first searches for matching documents, then deletes them by ID.
399+
400+
:param filters: The filters to apply to select documents for deletion.
401+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
402+
:returns: The number of documents deleted.
403+
"""
404+
_validate_filters(filters)
405+
406+
self._initialize_index()
407+
assert self._index is not None, "Index is not initialized"
408+
409+
documents = self.filter_documents(filters=filters)
410+
if not documents:
411+
return 0
412+
413+
document_ids = [doc.id for doc in documents]
414+
415+
self.delete_documents(document_ids)
416+
417+
deleted_count = len(document_ids)
418+
logger.info(
419+
"Deleted {n_docs} documents from index '{index}' using filters.",
420+
n_docs=deleted_count,
421+
index=self.index_name,
422+
)
423+
424+
return deleted_count
425+
426+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
427+
"""
428+
Asynchronously deletes all documents that match the provided filters.
429+
430+
Pinecone does not support server-side delete by filter, so this method
431+
first searches for matching documents, then deletes them by ID.
432+
433+
:param filters: The filters to apply to select documents for deletion.
434+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
435+
:returns: The number of documents deleted.
436+
"""
437+
_validate_filters(filters)
438+
439+
await self._initialize_async_index()
440+
assert self._async_index is not None, "Index is not initialized"
441+
442+
documents = await self.filter_documents_async(filters=filters)
443+
if not documents:
444+
return 0
445+
446+
document_ids = [doc.id for doc in documents]
447+
448+
await self.delete_documents_async(document_ids)
449+
450+
deleted_count = len(document_ids)
451+
logger.info(
452+
"Deleted {n_docs} documents from index '{index}' using filters.",
453+
n_docs=deleted_count,
454+
index=self.index_name,
455+
)
456+
457+
return deleted_count
458+
459+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
460+
"""
461+
Updates the metadata of all documents that match the provided filters.
462+
463+
Pinecone does not support server-side update by filter, so this method
464+
first searches for matching documents, then updates their metadata and re-writes them.
465+
466+
:param filters: The filters to apply to select documents for updating.
467+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
468+
:param meta: The metadata fields to update. This will be merged with existing metadata.
469+
:returns: The number of documents updated.
470+
"""
471+
_validate_filters(filters)
472+
473+
if not isinstance(meta, dict):
474+
msg = "meta must be a dictionary"
475+
raise ValueError(msg)
476+
477+
self._initialize_index()
478+
assert self._index is not None, "Index is not initialized"
479+
480+
documents = self.filter_documents(filters=filters)
481+
if not documents:
482+
return 0
483+
484+
self._update_documents_metadata(documents, meta)
485+
486+
# Re-write documents with updated metadata
487+
# Using OVERWRITE policy to update existing documents
488+
self.write_documents(documents, policy=DuplicatePolicy.OVERWRITE)
489+
490+
updated_count = len(documents)
491+
logger.info(
492+
"Updated {n_docs} documents in index '{index}' using filters.",
493+
n_docs=updated_count,
494+
index=self.index_name,
495+
)
496+
497+
return updated_count
498+
499+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
500+
"""
501+
Asynchronously updates the metadata of all documents that match the provided filters.
502+
503+
Pinecone does not support server-side update by filter, so this method
504+
first searches for matching documents, then updates their metadata and re-writes them.
505+
506+
:param filters: The filters to apply to select documents for updating.
507+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
508+
:param meta: The metadata fields to update. This will be merged with existing metadata.
509+
:returns: The number of documents updated.
510+
"""
511+
_validate_filters(filters)
512+
513+
if not isinstance(meta, dict):
514+
msg = "meta must be a dictionary"
515+
raise ValueError(msg)
516+
517+
await self._initialize_async_index()
518+
assert self._async_index is not None, "Index is not initialized"
519+
520+
documents = await self.filter_documents_async(filters=filters)
521+
if not documents:
522+
return 0
523+
524+
self._update_documents_metadata(documents, meta)
525+
526+
# Re-write documents with updated metadata
527+
# Using OVERWRITE policy to update existing documents
528+
await self.write_documents_async(documents, policy=DuplicatePolicy.OVERWRITE)
529+
530+
updated_count = len(documents)
531+
logger.info(
532+
"Updated {n_docs} documents in index '{index}' using filters.",
533+
n_docs=updated_count,
534+
index=self.index_name,
535+
)
536+
537+
return updated_count
538+
379539
def _embedding_retrieval(
380540
self,
381541
query_embedding: list[float],

integrations/pinecone/tests/test_document_store.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
import os
26
import time
37
from unittest.mock import patch
@@ -342,3 +346,100 @@ def test_sentence_window_retriever(self, document_store: PineconeDocumentStore):
342346
result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]])
343347

344348
assert len(result["context_windows"]) == 1
349+
350+
def test_delete_by_filter(self, document_store: PineconeDocumentStore):
351+
docs = [
352+
Document(content="Doc 1", meta={"category": "A"}),
353+
Document(content="Doc 2", meta={"category": "B"}),
354+
Document(content="Doc 3", meta={"category": "A"}),
355+
]
356+
document_store.write_documents(docs)
357+
358+
# delete documents with category="A"
359+
deleted_count = document_store.delete_by_filter(
360+
filters={"field": "meta.category", "operator": "==", "value": "A"}
361+
)
362+
assert deleted_count == 2
363+
assert document_store.count_documents() == 1
364+
365+
# verify only category B remains
366+
remaining_docs = document_store.filter_documents()
367+
assert len(remaining_docs) == 1
368+
assert remaining_docs[0].meta["category"] == "B"
369+
370+
def test_delete_by_filter_no_matches(self, document_store: PineconeDocumentStore):
371+
docs = [
372+
Document(content="Doc 1", meta={"category": "A"}),
373+
Document(content="Doc 2", meta={"category": "B"}),
374+
]
375+
document_store.write_documents(docs)
376+
377+
# try to delete documents with category="C" (no matches)
378+
deleted_count = document_store.delete_by_filter(
379+
filters={"field": "meta.category", "operator": "==", "value": "C"}
380+
)
381+
assert deleted_count == 0
382+
assert document_store.count_documents() == 2
383+
384+
def test_update_by_filter(self, document_store: PineconeDocumentStore):
385+
docs = [
386+
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
387+
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
388+
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
389+
]
390+
document_store.write_documents(docs)
391+
392+
# update status for category="A" documents
393+
updated_count = document_store.update_by_filter(
394+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
395+
)
396+
assert updated_count == 2
397+
398+
published_docs = document_store.filter_documents(
399+
filters={"field": "meta.status", "operator": "==", "value": "published"}
400+
)
401+
assert len(published_docs) == 2
402+
for doc in published_docs:
403+
assert doc.meta["category"] == "A"
404+
assert doc.meta["status"] == "published"
405+
406+
def test_update_by_filter_multiple_fields(self, document_store: PineconeDocumentStore):
407+
docs = [
408+
Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}),
409+
Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}),
410+
Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}),
411+
]
412+
document_store.write_documents(docs)
413+
assert document_store.count_documents() == 3
414+
415+
# Update multiple fields for category="A" documents
416+
updated_count = document_store.update_by_filter(
417+
filters={"field": "meta.category", "operator": "==", "value": "A"},
418+
meta={"status": "published", "priority": "high"},
419+
)
420+
assert updated_count == 2
421+
422+
# Verify the updates
423+
published_docs = document_store.filter_documents(
424+
filters={"field": "meta.status", "operator": "==", "value": "published"}
425+
)
426+
assert len(published_docs) == 2
427+
for doc in published_docs:
428+
assert doc.meta["category"] == "A"
429+
assert doc.meta["status"] == "published"
430+
assert doc.meta["priority"] == "high"
431+
432+
def test_update_by_filter_no_matches(self, document_store: PineconeDocumentStore):
433+
docs = [
434+
Document(content="Doc 1", meta={"category": "A"}),
435+
Document(content="Doc 2", meta={"category": "B"}),
436+
]
437+
document_store.write_documents(docs)
438+
assert document_store.count_documents() == 2
439+
440+
# Try to update documents with category="C" (no matches)
441+
updated_count = document_store.update_by_filter(
442+
filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"}
443+
)
444+
assert updated_count == 0
445+
assert document_store.count_documents() == 2

integrations/pinecone/tests/test_document_store_async.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
import os
26

37
import numpy as np
@@ -139,3 +143,99 @@ async def test_sentence_window_retriever(self, document_store_async: PineconeDoc
139143
result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]])
140144

141145
assert len(result["context_windows"]) == 1
146+
147+
async def test_delete_by_filter_async(self, document_store_async: PineconeDocumentStore):
148+
docs = [
149+
Document(content="Doc 1", meta={"category": "A"}),
150+
Document(content="Doc 2", meta={"category": "B"}),
151+
Document(content="Doc 3", meta={"category": "A"}),
152+
]
153+
await document_store_async.write_documents_async(docs)
154+
155+
# delete documents with category="A"
156+
deleted_count = await document_store_async.delete_by_filter_async(
157+
filters={"field": "meta.category", "operator": "==", "value": "A"}
158+
)
159+
assert deleted_count == 2
160+
assert await document_store_async.count_documents_async() == 1
161+
162+
# only category B remains
163+
remaining_docs = await document_store_async.filter_documents_async()
164+
assert len(remaining_docs) == 1
165+
assert remaining_docs[0].meta["category"] == "B"
166+
167+
async def test_delete_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore):
168+
docs = [
169+
Document(content="Doc 1", meta={"category": "A"}),
170+
Document(content="Doc 2", meta={"category": "B"}),
171+
]
172+
await document_store_async.write_documents_async(docs)
173+
174+
# delete documents with category="C" (no matches)
175+
deleted_count = await document_store_async.delete_by_filter_async(
176+
filters={"field": "meta.category", "operator": "==", "value": "C"}
177+
)
178+
assert deleted_count == 0
179+
assert await document_store_async.count_documents_async() == 2
180+
181+
async def test_update_by_filter_async(self, document_store_async: PineconeDocumentStore):
182+
docs = [
183+
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
184+
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
185+
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
186+
]
187+
await document_store_async.write_documents_async(docs)
188+
assert await document_store_async.count_documents_async() == 3
189+
190+
# update status for category="A" documents
191+
updated_count = await document_store_async.update_by_filter_async(
192+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
193+
)
194+
assert updated_count == 2
195+
196+
published_docs = await document_store_async.filter_documents_async(
197+
filters={"field": "meta.status", "operator": "==", "value": "published"}
198+
)
199+
assert len(published_docs) == 2
200+
for doc in published_docs:
201+
assert doc.meta["category"] == "A"
202+
assert doc.meta["status"] == "published"
203+
204+
async def test_update_by_filter_async_multiple_fields(self, document_store_async: PineconeDocumentStore):
205+
docs = [
206+
Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}),
207+
Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}),
208+
Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}),
209+
]
210+
await document_store_async.write_documents_async(docs)
211+
212+
# update multiple fields for category="A" documents
213+
updated_count = await document_store_async.update_by_filter_async(
214+
filters={"field": "meta.category", "operator": "==", "value": "A"},
215+
meta={"status": "published", "priority": "high"},
216+
)
217+
assert updated_count == 2
218+
219+
# verify the updates
220+
published_docs = await document_store_async.filter_documents_async(
221+
filters={"field": "meta.status", "operator": "==", "value": "published"}
222+
)
223+
assert len(published_docs) == 2
224+
for doc in published_docs:
225+
assert doc.meta["category"] == "A"
226+
assert doc.meta["status"] == "published"
227+
assert doc.meta["priority"] == "high"
228+
229+
async def test_update_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore):
230+
docs = [
231+
Document(content="Doc 1", meta={"category": "A"}),
232+
Document(content="Doc 2", meta={"category": "B"}),
233+
]
234+
await document_store_async.write_documents_async(docs)
235+
236+
# try to update documents with category="C" (no matches)
237+
updated_count = await document_store_async.update_by_filter_async(
238+
filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"}
239+
)
240+
assert updated_count == 0
241+
assert await document_store_async.count_documents_async() == 2

0 commit comments

Comments
 (0)