Skip to content
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from copy import copy
from typing import Any, Literal, Optional, Union

Expand Down Expand Up @@ -376,6 +377,165 @@ async def delete_all_documents_async(self) -> None:
# Namespace doesn't exist (empty collection), which is fine - nothing to delete
logger.debug("Namespace '{namespace}' not found. Nothing to delete.", namespace=self.namespace or "default")

@staticmethod
def _update_documents_metadata(documents: list[Document], meta: dict[str, Any]) -> None:
"""
Updates metadata for a list of documents by merging the provided meta dictionary.

:param documents: List of documents to update.
:param meta: Metadata fields to merge into each document's existing metadata.
"""
for document in documents:
if document.meta is None:
document.meta = {}
document.meta.update(meta)

def delete_by_filter(self, filters: dict[str, Any]) -> int:
"""
Deletes all documents that match the provided filters.

Pinecone does not support server-side delete by filter, so this method
first searches for matching documents, then deletes them by ID.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
_validate_filters(filters)

self._initialize_index()
assert self._index is not None, "Index is not initialized"

documents = self.filter_documents(filters=filters)
if not documents:
return 0

document_ids = [doc.id for doc in documents]

self.delete_documents(document_ids)

deleted_count = len(document_ids)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
n_docs=deleted_count,
index=self.index_name,
)

return deleted_count

async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

Pinecone does not support server-side delete by filter, so this method
first searches for matching documents, then deletes them by ID.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
_validate_filters(filters)

await self._initialize_async_index()
assert self._async_index is not None, "Index is not initialized"

documents = await self.filter_documents_async(filters=filters)
if not documents:
return 0

document_ids = [doc.id for doc in documents]

await self.delete_documents_async(document_ids)

deleted_count = len(document_ids)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
n_docs=deleted_count,
index=self.index_name,
)

return deleted_count

def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Updates the metadata of all documents that match the provided filters.

Pinecone does not support server-side update by filter, so this method
first searches for matching documents, then updates their metadata and re-writes them.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. This will be merged with existing metadata.
:returns: The number of documents updated.
"""
_validate_filters(filters)

if not isinstance(meta, dict):
msg = "meta must be a dictionary"
raise ValueError(msg)

self._initialize_index()
assert self._index is not None, "Index is not initialized"

documents = self.filter_documents(filters=filters)
if not documents:
return 0

self._update_documents_metadata(documents, meta)

# Re-write documents with updated metadata
# Using OVERWRITE policy to update existing documents
self.write_documents(documents, policy=DuplicatePolicy.OVERWRITE)

updated_count = len(documents)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
n_docs=updated_count,
index=self.index_name,
)

return updated_count

async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

Pinecone does not support server-side update by filter, so this method
first searches for matching documents, then updates their metadata and re-writes them.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update. This will be merged with existing metadata.
:returns: The number of documents updated.
"""
_validate_filters(filters)

if not isinstance(meta, dict):
msg = "meta must be a dictionary"
raise ValueError(msg)

await self._initialize_async_index()
assert self._async_index is not None, "Index is not initialized"

documents = await self.filter_documents_async(filters=filters)
if not documents:
return 0

self._update_documents_metadata(documents, meta)

# Re-write documents with updated metadata
# Using OVERWRITE policy to update existing documents
await self.write_documents_async(documents, policy=DuplicatePolicy.OVERWRITE)

updated_count = len(documents)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
n_docs=updated_count,
index=self.index_name,
)

return updated_count

def _embedding_retrieval(
self,
query_embedding: list[float],
Expand Down
101 changes: 101 additions & 0 deletions integrations/pinecone/tests/test_document_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os
import time
from unittest.mock import patch
Expand Down Expand Up @@ -342,3 +346,100 @@ def test_sentence_window_retriever(self, document_store: PineconeDocumentStore):
result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]])

assert len(result["context_windows"]) == 1

def test_delete_by_filter(self, document_store: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
document_store.write_documents(docs)

# delete documents with category="A"
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert deleted_count == 2
assert document_store.count_documents() == 1

# verify only category B remains
remaining_docs = document_store.filter_documents()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

def test_delete_by_filter_no_matches(self, document_store: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
]
document_store.write_documents(docs)

# try to delete documents with category="C" (no matches)
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "C"}
)
assert deleted_count == 0
assert document_store.count_documents() == 2

def test_update_by_filter(self, document_store: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
]
document_store.write_documents(docs)

# update status for category="A" documents
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
assert updated_count == 2

published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"

def test_update_by_filter_multiple_fields(self, document_store: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Update multiple fields for category="A" documents
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"},
meta={"status": "published", "priority": "high"},
)
assert updated_count == 2

# Verify the updates
published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"
assert doc.meta["priority"] == "high"

def test_update_by_filter_no_matches(self, document_store: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 2

# Try to update documents with category="C" (no matches)
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"}
)
assert updated_count == 0
assert document_store.count_documents() == 2
100 changes: 100 additions & 0 deletions integrations/pinecone/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os

import numpy as np
Expand Down Expand Up @@ -139,3 +143,99 @@ async def test_sentence_window_retriever(self, document_store_async: PineconeDoc
result = sentence_window_retriever.run(retrieved_documents=[retrieved_doc["documents"][0]])

assert len(result["context_windows"]) == 1

async def test_delete_by_filter_async(self, document_store_async: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
await document_store_async.write_documents_async(docs)

# delete documents with category="A"
deleted_count = await document_store_async.delete_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert deleted_count == 2
assert await document_store_async.count_documents_async() == 1

# only category B remains
remaining_docs = await document_store_async.filter_documents_async()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

async def test_delete_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
]
await document_store_async.write_documents_async(docs)

# delete documents with category="C" (no matches)
deleted_count = await document_store_async.delete_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "C"}
)
assert deleted_count == 0
assert await document_store_async.count_documents_async() == 2

async def test_update_by_filter_async(self, document_store_async: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
]
await document_store_async.write_documents_async(docs)
assert await document_store_async.count_documents_async() == 3

# update status for category="A" documents
updated_count = await document_store_async.update_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
assert updated_count == 2

published_docs = await document_store_async.filter_documents_async(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"

async def test_update_by_filter_async_multiple_fields(self, document_store_async: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft", "priority": "low"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft", "priority": "low"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft", "priority": "low"}),
]
await document_store_async.write_documents_async(docs)

# update multiple fields for category="A" documents
updated_count = await document_store_async.update_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"},
meta={"status": "published", "priority": "high"},
)
assert updated_count == 2

# verify the updates
published_docs = await document_store_async.filter_documents_async(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"
assert doc.meta["priority"] == "high"

async def test_update_by_filter_async_no_matches(self, document_store_async: PineconeDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
]
await document_store_async.write_documents_async(docs)

# try to update documents with category="C" (no matches)
updated_count = await document_store_async.update_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "C"}, meta={"status": "published"}
)
assert updated_count == 0
assert await document_store_async.count_documents_async() == 2