Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#
# SPDX-License-Identifier: Apache-2.0

# ruff: noqa: FBT001, FBT002 boolean-type-hint-positional-argument and boolean-default-value-positional-argument

from collections.abc import Mapping
from math import exp
from typing import Any, Optional, Union
from typing import Any, Literal, Optional, Union

from haystack import default_from_dict, default_to_dict, logging
from haystack.dataclasses import Document
Expand All @@ -19,6 +21,7 @@

logger = logging.getLogger(__name__)


Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]]]]]

# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
Expand Down Expand Up @@ -399,7 +402,12 @@ async def filter_documents_async(self, filters: Optional[dict[str, Any]] = None)
return await self._search_documents_async(self._prepare_filter_search_request(filters))

def _prepare_bulk_write_request(
self, *, documents: list[Document], policy: DuplicatePolicy, is_async: bool
self,
*,
documents: list[Document],
policy: DuplicatePolicy,
is_async: bool,
refresh: Literal["wait_for", True, False],
) -> dict[str, Any]:
if len(documents) > 0 and not isinstance(documents[0], Document):
msg = "param 'documents' must contain a list of objects of type Document"
Expand Down Expand Up @@ -432,7 +440,7 @@ def _prepare_bulk_write_request(
return {
"client": self._client if not is_async else self._async_client,
"actions": opensearch_actions,
"refresh": "wait_for",
"refresh": refresh,
"index": self._index,
"raise_on_error": False,
"max_chunk_bytes": self._max_chunk_bytes,
Expand Down Expand Up @@ -469,36 +477,58 @@ def _process_bulk_write_errors(errors: list[dict[str, Any]], policy: DuplicatePo
msg = f"Failed to write documents to OpenSearch. Errors:\n{other_errors}"
raise DocumentStoreError(msg)

def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
def write_documents(
self,
documents: list[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE,
refresh: Literal["wait_for", True, False] = "wait_for",
) -> int:
"""
Writes documents to the document store.

:param documents: A list of Documents to write to the document store.
:param policy: The duplicate policy to use when writing documents.
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
:raises DuplicateDocumentError: If a document with the same id already exists in the document store
and the policy is set to `DuplicatePolicy.FAIL` (or not specified).
:returns: The number of documents written to the document store.
"""
self._ensure_initialized()

bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=False)
bulk_params = self._prepare_bulk_write_request(
documents=documents, policy=policy, is_async=False, refresh=refresh
)
documents_written, errors = bulk(**bulk_params)
OpenSearchDocumentStore._process_bulk_write_errors(errors, policy)
return documents_written

async def write_documents_async(
self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
self,
documents: list[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE,
refresh: Literal["wait_for", True, False] = "wait_for",
) -> int:
"""
Asynchronously writes documents to the document store.

:param documents: A list of Documents to write to the document store.
:param policy: The duplicate policy to use when writing documents.
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
:returns: The number of documents written to the document store.
"""
await self._ensure_initialized_async()
assert self._async_client is not None
bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=True)
bulk_params = self._prepare_bulk_write_request(
documents=documents, policy=policy, is_async=True, refresh=refresh
)
documents_written, errors = await async_bulk(**bulk_params)
# since we call async_bulk with stats_only=False, errors is guaranteed to be a list (not int)
OpenSearchDocumentStore._process_bulk_write_errors(errors=errors, policy=policy) # type: ignore[arg-type]
Expand All @@ -518,51 +548,71 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:

return Document.from_dict(data)

def _prepare_bulk_delete_request(self, *, document_ids: list[str], is_async: bool) -> dict[str, Any]:
def _prepare_bulk_delete_request(
self, *, document_ids: list[str], is_async: bool, refresh: Literal["wait_for", True, False]
) -> dict[str, Any]:
return {
"client": self._client if not is_async else self._async_client,
"actions": ({"_op_type": "delete", "_id": id_} for id_ in document_ids),
"refresh": "wait_for",
"refresh": refresh,
"index": self._index,
"raise_on_error": False,
"max_chunk_bytes": self._max_chunk_bytes,
}

def delete_documents(self, document_ids: list[str]) -> None:
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
"""
Deletes documents that match the provided `document_ids` from the document store.

:param document_ids: the document ids to delete
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
"""

self._ensure_initialized()

bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=False))
bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=False, refresh=refresh))

async def delete_documents_async(self, document_ids: list[str]) -> None:
async def delete_documents_async(
self,
document_ids: list[str],
refresh: Literal["wait_for", True, False] = "wait_for",
) -> None:
"""
Asynchronously deletes documents that match the provided `document_ids` from the document store.

:param document_ids: the document ids to delete
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
"""
await self._ensure_initialized_async()
assert self._async_client is not None

await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True))
await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True, refresh=refresh))

def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]:
def _prepare_delete_all_request(self, *, refresh: bool) -> dict[str, Any]:
return {
"index": self._index,
"body": {"query": {"match_all": {}}}, # Delete all documents
"wait_for_completion": False if is_async else True, # block until done (set False for async)
"wait_for_completion": True, # Always wait to ensure documents are deleted before returning
"refresh": refresh,
}

def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001
def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None:
"""
Deletes all documents in the document store.

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
"""
self._ensure_initialized()
assert self._client is not None
Expand All @@ -587,7 +637,7 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: F
)

else:
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False))
result = self._client.delete_by_query(**self._prepare_delete_all_request(refresh=refresh))
logger.info(
"Deleted all the {n_docs} documents from the index '{index}'.",
index=self._index,
Expand All @@ -597,12 +647,15 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: F
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_all_documents_async(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001
async def delete_all_documents_async(self, recreate_index: bool = False, refresh: bool = True) -> None:
"""
Asynchronously deletes all documents in the document store.

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
"""
await self._ensure_initialized_async()
assert self._async_client is not None
Expand All @@ -624,18 +677,22 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
await self._async_client.indices.delete(index=self._index)
await self._async_client.indices.create(index=self._index, body=body)
else:
await self._async_client.delete_by_query(**self._prepare_delete_all_request(is_async=True))
# use delete_by_query for more efficient deletion without index recreation
await self._async_client.delete_by_query(**self._prepare_delete_all_request(refresh=refresh))

except Exception as e:
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def delete_by_filter(self, filters: dict[str, Any]) -> int:
def delete_by_filter(self, filters: dict[str, Any], refresh: bool = False) -> int:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
:returns: The number of documents deleted.
"""
self._ensure_initialized()
Expand All @@ -644,7 +701,7 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int:
try:
normalized_filters = normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = self._client.delete_by_query(index=self._index, body=body)
result = self._client.delete_by_query(index=self._index, body=body, refresh=refresh)
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
Expand All @@ -656,12 +713,15 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int:
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = False) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param refresh: If True, OpenSearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch delete_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/delete-by-query/).
:returns: The number of documents deleted.
"""
await self._ensure_initialized_async()
Expand All @@ -670,7 +730,7 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
try:
normalized_filters = normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = await self._async_client.delete_by_query(index=self._index, body=body)
result = await self._async_client.delete_by_query(index=self._index, body=body, refresh=refresh)
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
Expand All @@ -682,13 +742,16 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
"""
Updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:param refresh: If True, OpenSearch refreshes all shards involved in the update by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch update_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/update-by-query/).
:returns: The number of documents updated.
"""
self._ensure_initialized()
Expand All @@ -707,7 +770,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
"query": {"bool": {"filter": normalized_filters}},
"script": {"source": update_script, "params": meta, "lang": "painless"},
}
result = self._client.update_by_query(index=self._index, body=body)
result = self._client.update_by_query(index=self._index, body=body, refresh=refresh)
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
Expand All @@ -719,13 +782,16 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
msg = f"Failed to update documents by filter in OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:param refresh: If True, OpenSearch refreshes all shards involved in the update by query after the request
completes. If False, no refresh is performed. For more details, see the
[OpenSearch update_by_query refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/update-by-query/).
:returns: The number of documents updated.
"""
await self._ensure_initialized_async()
Expand All @@ -744,7 +810,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
"query": {"bool": {"filter": normalized_filters}},
"script": {"source": update_script, "params": meta, "lang": "painless"},
}
result = await self._async_client.update_by_query(index=self._index, body=body)
result = await self._async_client.update_by_query(index=self._index, body=body, refresh=refresh)
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
Expand Down
Loading