Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions state-manager/app/controller/manual_retry_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from pymongo.errors import DuplicateKeyError
from app.models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from beanie import PydanticObjectId
from app.singletons.logs_manager import LogsManager
from app.models.state_status_enum import StateStatusEnum
from fastapi import HTTPException, status
from app.models.db.state import State


logger = LogsManager().get_logger()

async def manual_retry_state(namespace_name: str, state_id: PydanticObjectId, body: ManualRetryRequestModel, x_exosphere_request_id: str):
try:
logger.info(f"Manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

state = await State.find_one(State.id == state_id)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")

try:
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
Comment on lines +21 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Blocker: fingerprint collision for does_unites states; set retry_count on the new retry state.
Without bumping retry_count, the retry state’s fingerprint can equal the original (unique index uniq_state_fingerprint_unites), causing DuplicateKeyError even on first retry. Also aligns with the unique (node, ns, graph, identifier, run_id, retry_count, fanout_id) index.

Apply:

             retry_state = State(
                 node_name=state.node_name,
                 namespace_name=state.namespace_name,
                 identifier=state.identifier,
                 graph_name=state.graph_name,
                 run_id=state.run_id,
                 status=StateStatusEnum.CREATED,
                 inputs=state.inputs,
                 outputs={},
                 error=None,
                 parents=state.parents,
                 does_unites=state.does_unites,
+                retry_count=(getattr(state, "retry_count", 0) + 1),
                 fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
retry_count=(getattr(state, "retry_count", 0) + 1),
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
🤖 Prompt for AI Agents
In state-manager/app/controller/manual_retry_state.py around lines 21 to 34, the
newly created retry State does not set retry_count which can produce a
fingerprint collision against the uniq_state_fingerprint_unites index; set
retry_count on the new State to (state.retry_count or 0) + 1 so the retry
fingerprint differs from the original (handle None by treating as 0), e.g.
assign retry_count before persisting the new State.

retry_state = await retry_state.insert()
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)

state.status = StateStatusEnum.RETRY_CREATED
await state.save()

return ManualRetryResponseModel(id=str(retry_state.id), status=retry_state.status)
Comment on lines +36 to +41
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Consider atomicity between insert and original state update.
If save() fails after insert(), you end up with a retry state but original not marked RETRY_CREATED. Prefer a Mongo transaction or a compensating update with retry/backoff.

🤖 Prompt for AI Agents
In state-manager/app/controller/manual_retry_state.py around lines 36 to 41, the
code inserts a retry_state then sets state.status and calls state.save()
separately, which can leave data inconsistent if save() fails; wrap both the
insert and the original state update in a single MongoDB transaction (start a
client session and use with_transaction to insert the retry document and update
the original state status to RETRY_CREATED within the same transaction) so both
succeed or fail together; if transactions are not available in the deployment,
implement a compensating flow: after insert() attempt the state.save() with an
exponential backoff retry loop, and if all retries fail, delete the created
retry_state (or mark it as failed) to restore consistency, and ensure all
operations log errors with context and surface failures to the caller.

except DuplicateKeyError:
logger.info(f"Duplicate retry state detected for state {state_id}. A retry state with the same unique key already exists.", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Duplicate retry state detected")


except Exception as e:
logger.error(f"Error manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise e
11 changes: 11 additions & 0 deletions state-manager/app/models/manual_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel, Field
from .state_status_enum import StateStatusEnum


class ManualRetryRequestModel(BaseModel):
fanout_id: str = Field(..., description="Fanout ID of the state")

Comment thread
NiveditJain marked this conversation as resolved.

class ManualRetryResponseModel(BaseModel):
id: str = Field(..., description="ID of the state")
status: StateStatusEnum = Field(..., description="Status of the state")
22 changes: 22 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
from .models.signal_models import ReEnqueueAfterRequestModel
from .controller.re_queue_after_signal import re_queue_after_signal

# manual_retry
from .models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from .controller.manual_retry_state import manual_retry_state

Comment thread
NiveditJain marked this conversation as resolved.

logger = LogsManager().get_logger()

Expand Down Expand Up @@ -176,6 +180,24 @@ async def re_enqueue_after_state_route(namespace_name: str, state_id: str, body:

return await re_queue_after_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)

@router.post(
"/state/{state_id}/manual-retry",
response_model=ManualRetryResponseModel,
status_code=status.HTTP_200_OK,
response_description="State manual retry successfully",
tags=["state"]
)
Comment thread
NiveditJain marked this conversation as resolved.
async def manual_retry_state_route(namespace_name: str, state_id: str, body: ManualRetryRequestModel, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
Comment thread
NiveditJain marked this conversation as resolved.

return await manual_retry_state(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


@router.put(
"/graph/{graph_name}",
Expand Down
241 changes: 241 additions & 0 deletions state-manager/tests/unit/models/test_manual_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import pytest
from pydantic import ValidationError

from app.models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from app.models.state_status_enum import StateStatusEnum


class TestManualRetryRequestModel:
"""Test cases for ManualRetryRequestModel"""

def test_manual_retry_request_model_valid_data(self):
"""Test ManualRetryRequestModel with valid fanout_id"""
# Arrange & Act
fanout_id = "test-fanout-id-123"
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
assert model.fanout_id == fanout_id

def test_manual_retry_request_model_empty_fanout_id(self):
"""Test ManualRetryRequestModel with empty fanout_id"""
# Arrange & Act
fanout_id = ""
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
assert model.fanout_id == fanout_id

def test_manual_retry_request_model_uuid_fanout_id(self):
"""Test ManualRetryRequestModel with UUID fanout_id"""
# Arrange & Act
fanout_id = "550e8400-e29b-41d4-a716-446655440000"
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
assert model.fanout_id == fanout_id

def test_manual_retry_request_model_long_fanout_id(self):
"""Test ManualRetryRequestModel with long fanout_id"""
# Arrange & Act
fanout_id = "a" * 1000 # Very long string
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
assert model.fanout_id == fanout_id

def test_manual_retry_request_model_special_characters_fanout_id(self):
"""Test ManualRetryRequestModel with special characters in fanout_id"""
# Arrange & Act
fanout_id = "test-fanout@#$%^&*()_+-={}[]|\\:;\"'<>?,./"
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
assert model.fanout_id == fanout_id

def test_manual_retry_request_model_missing_fanout_id(self):
"""Test ManualRetryRequestModel with missing fanout_id field"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryRequestModel() # type: ignore

assert "fanout_id" in str(exc_info.value)
assert "Field required" in str(exc_info.value)

def test_manual_retry_request_model_none_fanout_id(self):
"""Test ManualRetryRequestModel with None fanout_id"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryRequestModel(fanout_id=None) # type: ignore

assert "fanout_id" in str(exc_info.value)

def test_manual_retry_request_model_numeric_fanout_id(self):
"""Test ManualRetryRequestModel with numeric fanout_id (should fail validation)"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryRequestModel(fanout_id=12345) # type: ignore

assert "string_type" in str(exc_info.value)

def test_manual_retry_request_model_dict_representation(self):
"""Test ManualRetryRequestModel dict representation"""
# Arrange & Act
fanout_id = "test-fanout-id"
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
expected_dict = {"fanout_id": fanout_id}
assert model.model_dump() == expected_dict

def test_manual_retry_request_model_json_serialization(self):
"""Test ManualRetryRequestModel JSON serialization"""
# Arrange & Act
fanout_id = "test-fanout-id"
model = ManualRetryRequestModel(fanout_id=fanout_id)

# Assert
json_str = model.model_dump_json()
assert f'"fanout_id":"{fanout_id}"' in json_str


class TestManualRetryResponseModel:
"""Test cases for ManualRetryResponseModel"""

def test_manual_retry_response_model_valid_data(self):
"""Test ManualRetryResponseModel with valid data"""
# Arrange & Act
state_id = "507f1f77bcf86cd799439011"
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
assert model.id == state_id
assert model.status == status

def test_manual_retry_response_model_all_status_types(self):
"""Test ManualRetryResponseModel with all possible status values"""
# Arrange & Act & Assert
state_id = "507f1f77bcf86cd799439011"

for status in StateStatusEnum:
model = ManualRetryResponseModel(id=state_id, status=status)
assert model.id == state_id
assert model.status == status

def test_manual_retry_response_model_created_status(self):
"""Test ManualRetryResponseModel with CREATED status"""
# Arrange & Act
state_id = "507f1f77bcf86cd799439011"
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
assert model.id == state_id
assert model.status == StateStatusEnum.CREATED

def test_manual_retry_response_model_retry_created_status(self):
"""Test ManualRetryResponseModel with RETRY_CREATED status"""
# Arrange & Act
state_id = "507f1f77bcf86cd799439011"
status = StateStatusEnum.RETRY_CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
assert model.id == state_id
assert model.status == StateStatusEnum.RETRY_CREATED

def test_manual_retry_response_model_missing_id(self):
"""Test ManualRetryResponseModel with missing id field"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(status=StateStatusEnum.CREATED) # type: ignore

assert "id" in str(exc_info.value)
assert "Field required" in str(exc_info.value)

def test_manual_retry_response_model_missing_status(self):
"""Test ManualRetryResponseModel with missing status field"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(id="507f1f77bcf86cd799439011") # type: ignore

assert "status" in str(exc_info.value)
assert "Field required" in str(exc_info.value)

def test_manual_retry_response_model_none_id(self):
"""Test ManualRetryResponseModel with None id"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(id=None, status=StateStatusEnum.CREATED) # type: ignore

assert "id" in str(exc_info.value)

def test_manual_retry_response_model_none_status(self):
"""Test ManualRetryResponseModel with None status"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(id="507f1f77bcf86cd799439011", status=None) # type: ignore

assert "status" in str(exc_info.value)

def test_manual_retry_response_model_invalid_status(self):
"""Test ManualRetryResponseModel with invalid status"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(id="507f1f77bcf86cd799439011", status="INVALID_STATUS") # type: ignore

assert "status" in str(exc_info.value)

def test_manual_retry_response_model_numeric_id(self):
"""Test ManualRetryResponseModel with numeric id (should fail validation)"""
# Arrange & Act & Assert
with pytest.raises(ValidationError) as exc_info:
ManualRetryResponseModel(id=12345, status=StateStatusEnum.CREATED) # type: ignore

assert "string_type" in str(exc_info.value)

def test_manual_retry_response_model_dict_representation(self):
"""Test ManualRetryResponseModel dict representation"""
# Arrange & Act
state_id = "507f1f77bcf86cd799439011"
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
expected_dict = {"id": state_id, "status": status}
assert model.model_dump() == expected_dict

def test_manual_retry_response_model_json_serialization(self):
"""Test ManualRetryResponseModel JSON serialization"""
# Arrange & Act
state_id = "507f1f77bcf86cd799439011"
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
json_str = model.model_dump_json()
assert f'"id":"{state_id}"' in json_str
assert f'"status":"{status.value}"' in json_str

def test_manual_retry_response_model_empty_id(self):
"""Test ManualRetryResponseModel with empty string id"""
# Arrange & Act
state_id = ""
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
assert model.id == state_id
assert model.status == status

def test_manual_retry_response_model_long_id(self):
"""Test ManualRetryResponseModel with very long id"""
# Arrange & Act
state_id = "a" * 1000 # Very long string
status = StateStatusEnum.CREATED
model = ManualRetryResponseModel(id=state_id, status=status)

# Assert
assert model.id == state_id
assert model.status == status
Loading
Loading