Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions state-manager/app/controller/manul_retry_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from pymongo.errors import DuplicateKeyError
from app.models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from beanie import PydanticObjectId
from app.singletons.logs_manager import LogsManager
from app.models.state_status_enum import StateStatusEnum
from fastapi import HTTPException, status
from app.models.db.state import State


logger = LogsManager().get_logger()

async def manual_retry_state(namespace_name: str, state_id: PydanticObjectId, body: ManualRetryRequestModel, x_exosphere_request_id: str):
try:
logger.info(f"Manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

state = await State.find_one(State.id == state_id)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")
Comment thread
NiveditJain marked this conversation as resolved.
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

try:
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
retry_state = await retry_state.insert()
Comment thread
NiveditJain marked this conversation as resolved.
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)

state.status = StateStatusEnum.RETRY_CREATED
await state.save()

return ManualRetryResponseModel(id=str(retry_state.id), status=retry_state.status)
except DuplicateKeyError:
logger.info(f"Duplicate retry state detected for state {state_id}. A retry state with the same unique key already exists.", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Duplicate retry state detected")

Comment thread
NiveditJain marked this conversation as resolved.

except Exception as e:
logger.error(f"Error manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise e
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
11 changes: 11 additions & 0 deletions state-manager/app/models/manual_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel, Field
from .state_status_enum import StateStatusEnum


class ManualRetryRequestModel(BaseModel):
fanout_id: str = Field(..., description="Fanout ID of the state")

Comment thread
NiveditJain marked this conversation as resolved.

class ManualRetryResponseModel(BaseModel):
id: str = Field(..., description="ID of the state")
status: StateStatusEnum = Field(..., description="Status of the state")
22 changes: 22 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
from .models.signal_models import ReEnqueueAfterRequestModel
from .controller.re_queue_after_signal import re_queue_after_signal

# manual_retry
from .models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from .controller.manul_retry_state import manual_retry_state
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

Comment thread
NiveditJain marked this conversation as resolved.

logger = LogsManager().get_logger()

Expand Down Expand Up @@ -176,6 +180,24 @@ async def re_enqueue_after_state_route(namespace_name: str, state_id: str, body:

return await re_queue_after_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)

@router.post(
"state/{state_id}/manual-retry",
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
response_model=ManualRetryResponseModel,
status_code=status.HTTP_200_OK,
response_description="State manual retry successfully",
tags=["state"]
)
Comment thread
NiveditJain marked this conversation as resolved.
async def manual_retry_state_route(namespace_name: str, state_id: str, body: ManualRetryRequestModel, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
Comment thread
NiveditJain marked this conversation as resolved.

return await manual_retry_state(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


@router.put(
"/graph/{graph_name}",
Expand Down
Loading