Skip to content

Commit 89debc3

Browse files
Added routing functionality for bulk write and bulk delete requests
1 parent f76c748 commit 89debc3

3 files changed

Lines changed: 150 additions & 14 deletions

File tree

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ class OpenSearchDocumentStore:
4343
4444
Usage example:
4545
```python
46-
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
46+
from haystack_integrations.document_stores.opensearch import (
47+
OpenSearchDocumentStore,
48+
)
4749
from haystack import Document
4850
4951
document_store = OpenSearchDocumentStore(hosts="localhost:9200")
@@ -393,6 +395,10 @@ def _prepare_bulk_write_request(
393395
opensearch_actions = []
394396
for doc in documents:
395397
doc_dict = doc.to_dict()
398+
399+
# Extract routing from document metadata
400+
doc_routing = doc_dict.pop("_routing", None)
401+
396402
if "sparse_embedding" in doc_dict:
397403
sparse_embedding = doc_dict.pop("sparse_embedding", None)
398404
if sparse_embedding:
@@ -402,13 +408,17 @@ def _prepare_bulk_write_request(
402408
"The `sparse_embedding` field will be ignored.",
403409
id=doc.id,
404410
)
405-
opensearch_actions.append(
406-
{
407-
"_op_type": action,
408-
"_id": doc.id,
409-
"_source": doc_dict,
410-
}
411-
)
411+
412+
action_dict = {
413+
"_op_type": action,
414+
"_id": doc.id,
415+
"_source": doc_dict,
416+
}
417+
418+
if doc_routing:
419+
action_dict["_routing"] = doc_routing
420+
421+
opensearch_actions.append(action_dict)
412422

413423
return {
414424
"client": self._client if not is_async else self._async_client,
@@ -498,36 +508,48 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
498508

499509
return Document.from_dict(data)
500510

501-
def _prepare_bulk_delete_request(self, *, document_ids: list[str], is_async: bool) -> dict[str, Any]:
511+
def _prepare_bulk_delete_request(
512+
self, *, document_ids: list[str], is_async: bool, routing: Optional[dict[str, str]] = None
513+
) -> dict[str, Any]:
514+
def action_generator():
515+
for id_ in document_ids:
516+
action = {"_op_type": "delete", "_id": id_}
517+
# Add routing if provided for this document ID
518+
if routing and id_ in routing:
519+
action["_routing"] = routing[id_]
520+
yield action
521+
502522
return {
503523
"client": self._client if not is_async else self._async_client,
504-
"actions": ({"_op_type": "delete", "_id": id_} for id_ in document_ids),
524+
"actions": action_generator(),
505525
"refresh": "wait_for",
506526
"index": self._index,
507527
"raise_on_error": False,
508528
"max_chunk_bytes": self._max_chunk_bytes,
509529
}
510530

511-
def delete_documents(self, document_ids: list[str]) -> None:
531+
def delete_documents(self, document_ids: list[str], routing: Optional[dict[str, str]] = None) -> None:
512532
"""
513533
Deletes documents that match the provided `document_ids` from the document store.
514534
515535
:param document_ids: the document ids to delete
536+
:param routing: the routing to use when deleting documents
516537
"""
517538

518539
self._ensure_initialized()
519540

520-
bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=False))
541+
bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=False, routing=routing))
521542

522-
async def delete_documents_async(self, document_ids: list[str]) -> None:
543+
async def delete_documents_async(self, document_ids: list[str], routing: Optional[dict[str, str]] = None) -> None:
523544
"""
524545
Asynchronously deletes documents that match the provided `document_ids` from the document store.
525546
526547
:param document_ids: the document ids to delete
548+
:param routing: the routing to use when deleting documents
527549
"""
528550
self._ensure_initialized()
529551

530-
await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True))
552+
await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True, routing=routing))
531553

532554
def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]:
533555
return {

integrations/opensearch/tests/test_document_store.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,48 @@ def test_get_default_mappings(_mock_opensearch_client):
111111
}
112112

113113

114+
@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
115+
def test_routing_extracted_from_metadata(mock_bulk, document_store):
116+
"""Test routing extraction from document metadata"""
117+
mock_bulk.return_value = (2, [])
118+
119+
docs = [
120+
Document(id="1", content="Doc", meta={"_routing": "user_a", "other": "data"}),
121+
Document(id="2", content="Doc"),
122+
]
123+
document_store.write_documents(docs)
124+
125+
actions = list(mock_bulk.call_args.kwargs["actions"])
126+
127+
# Routing should be at action level, not in _source
128+
assert actions[0]["_routing"] == "user_a"
129+
assert "_routing" not in actions[0]["_source"].get("meta", {})
130+
131+
# Other metadata should be preserved
132+
assert (
133+
actions[0]["_source"].get("meta", {}).get("other") == "data" or actions[0]["_source"].get("other") == "data"
134+
) # Handle both nested and flat
135+
136+
# Second doc has no routing
137+
assert "_routing" not in actions[1]
138+
assert "_routing" not in actions[1]["_source"].get("meta", {})
139+
140+
141+
@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
142+
def test_routing_in_delete(mock_bulk, document_store):
143+
"""Test routing parameter in delete operations"""
144+
mock_bulk.return_value = (2, [])
145+
146+
routing_map = {"1": "user_a", "2": "user_b"}
147+
document_store.delete_documents(["1", "2", "3"], routing=routing_map)
148+
149+
actions = list(mock_bulk.call_args.kwargs["actions"])
150+
151+
assert actions[0]["_routing"] == "user_a"
152+
assert actions[1]["_routing"] == "user_b"
153+
assert "_routing" not in actions[2]
154+
155+
114156
@pytest.mark.integration
115157
class TestDocumentStore(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest):
116158
"""
@@ -576,3 +618,39 @@ def test_update_by_filter(self, document_store: OpenSearchDocumentStore):
576618
)
577619
assert len(draft_docs) == 1
578620
assert draft_docs[0].meta["category"] == "B"
621+
622+
@pytest.mark.integration
623+
def test_write_with_routing(self, document_store: OpenSearchDocumentStore):
624+
"""Test writing documents with routing metadata"""
625+
docs = [
626+
Document(id="1", content="User A doc", meta={"_routing": "user_a", "category": "test"}),
627+
Document(id="2", content="User B doc", meta={"_routing": "user_b"}),
628+
Document(id="3", content="No routing"),
629+
]
630+
631+
written = document_store.write_documents(docs)
632+
assert written == 3
633+
assert document_store.count_documents() == 3
634+
635+
# Verify _routing not stored in metadata
636+
retrieved = document_store.filter_documents()
637+
for doc in retrieved:
638+
assert "_routing" not in doc.meta
639+
if doc.id == "1":
640+
assert doc.meta["category"] == "test"
641+
642+
@pytest.mark.integration
643+
def test_delete_with_routing(self, document_store: OpenSearchDocumentStore):
644+
"""Test deleting documents with routing"""
645+
docs = [
646+
Document(id="1", content="Doc 1", meta={"_routing": "user_a"}),
647+
Document(id="2", content="Doc 2", meta={"_routing": "user_b"}),
648+
Document(id="3", content="Doc 3"),
649+
]
650+
document_store.write_documents(docs)
651+
652+
routing_map = {"1": "user_a", "2": "user_b"}
653+
document_store.delete_documents(["1", "2"], routing=routing_map)
654+
655+
time.sleep(1)
656+
assert document_store.count_documents() == 1

integrations/opensearch/tests/test_document_store_async.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,42 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op
302302
assert len(results) == 1
303303
assert results[0].content == "New document after delete all"
304304

305+
@pytest.mark.asyncio
306+
async def test_write_with_routing(self, document_store: OpenSearchDocumentStore):
307+
"""Test async writing documents with routing metadata"""
308+
docs = [
309+
Document(id="1", content="User A doc", meta={"_routing": "user_a", "category": "test"}),
310+
Document(id="2", content="User B doc", meta={"_routing": "user_b"}),
311+
Document(id="3", content="No routing"),
312+
]
313+
314+
written = await document_store.write_documents_async(docs)
315+
assert written == 3
316+
assert await document_store.count_documents_async() == 3
317+
318+
# Verify _routing not stored in metadata
319+
retrieved = await document_store.filter_documents_async()
320+
for doc in retrieved:
321+
assert "_routing" not in doc.meta
322+
if doc.id == "1":
323+
assert doc.meta["category"] == "test"
324+
325+
@pytest.mark.asyncio
326+
async def test_delete_with_routing(self, document_store: OpenSearchDocumentStore):
327+
"""Test async deleting documents with routing"""
328+
docs = [
329+
Document(id="1", content="Doc 1", meta={"_routing": "user_a"}),
330+
Document(id="2", content="Doc 2", meta={"_routing": "user_b"}),
331+
Document(id="3", content="Doc 3"),
332+
]
333+
document_store.write_documents(docs)
334+
335+
routing_map = {"1": "user_a", "2": "user_b"}
336+
await document_store.delete_documents_async(["1", "2"], routing=routing_map)
337+
338+
time.sleep(1)
339+
assert await document_store.count_documents_async() == 1
340+
305341
async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore):
306342
docs = [
307343
Document(content="Doc 1", meta={"category": "A"}),

0 commit comments

Comments
 (0)