Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8f7bcc1
Implement retry policy and enhance errored state handling
NiveditJain Aug 31, 2025
a00b97c
Merge remote-tracking branch 'origin/main' into retries
NiveditJain Aug 31, 2025
f6a3bb2
Refactor retry policy and errored state handling
NiveditJain Aug 31, 2025
66c794a
Add retry policy documentation and integrate into graph configuration
NiveditJain Aug 31, 2025
728b225
Enhance retry policy error handling and validation
NiveditJain Aug 31, 2025
ba13785
Update retry policy documentation and examples
NiveditJain Aug 31, 2025
6e02d50
Enhance retry policy implementation and documentation
NiveditJain Aug 31, 2025
64592d8
Enhance errored state handling with retry state management
NiveditJain Aug 31, 2025
33e75d3
Update state-manager/app/models/db/state.py
NiveditJain Aug 31, 2025
0c16bd7
Update docs/docs/exosphere/retry-policy.md
NiveditJain Aug 31, 2025
b0cabb0
Update docs/docs/exosphere/retry-policy.md
NiveditJain Aug 31, 2025
74da1b8
Update state-manager/app/controller/errored_state.py
NiveditJain Aug 31, 2025
06ee0a3
Update state-manager/app/controller/errored_state.py
NiveditJain Aug 31, 2025
fded75b
Update state-manager/app/models/retry_policy_model.py
NiveditJain Aug 31, 2025
e56d5f5
Update max_delay description in RetryPolicyModel to clarify behavior …
NiveditJain Aug 31, 2025
42efd68
Refine documentation for retry policy and errored state handling
NiveditJain Aug 31, 2025
4ec30ab
Enhance tests for errored state and upsert graph template
NiveditJain Aug 31, 2025
b1b85d1
Refactor test for RetryPolicyModel by removing unnecessary import
NiveditJain Aug 31, 2025
3806d26
Add comprehensive tests for errored state handling in graph templates
NiveditJain Aug 31, 2025
4048228
Refactor assertions in errored state tests for clarity
NiveditJain Aug 31, 2025
462cfde
Remove Kubernetes deployment steps from the publish workflow
NiveditJain Aug 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion state-manager/app/controller/errored_state.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import time

from app.models.errored_models import ErroredRequestModel, ErroredResponseModel
from fastapi import HTTPException, status
from beanie import PydanticObjectId

from app.models.db.state import State
from app.models.state_status_enum import StateStatusEnum
from app.singletons.logs_manager import LogsManager
from app.models.db.graph_template_model import GraphTemplate

logger = LogsManager().get_logger()

Expand All @@ -23,11 +26,35 @@ async def errored_state(namespace_name: str, state_id: PydanticObjectId, body: E
if state.status == StateStatusEnum.EXECUTED:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is already executed")

graph_template = await GraphTemplate.get(namespace_name, state.graph_name)
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

retry_created = False

if state.retry_count < graph_template.retry_policy.max_retries:
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
enqueue_after= int(time.time() * 1000) + graph_template.retry_policy.compute_delay(state.retry_count + 1),
retry_count=state.retry_count + 1
)
retry_state = await retry_state.insert()
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)
retry_created = True

Comment thread
NiveditJain marked this conversation as resolved.
state.status = StateStatusEnum.ERRORED
state.error = body.error
await state.save()

return ErroredResponseModel(status=StateStatusEnum.ERRORED)
return ErroredResponseModel(status=StateStatusEnum.ERRORED, retry_created=retry_created)

except Exception as e:
logger.error(f"Error errored state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e)
Expand Down
7 changes: 5 additions & 2 deletions state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
Set({
GraphTemplate.nodes: body.nodes, # type: ignore
GraphTemplate.validation_status: GraphTemplateValidationStatus.PENDING, # type: ignore
GraphTemplate.validation_errors: [] # type: ignore
GraphTemplate.validation_errors: [], # type: ignore
GraphTemplate.retry_policy: body.retry_policy # type: ignore
})
)

Expand All @@ -44,7 +45,8 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
namespace=namespace_name,
nodes=body.nodes,
validation_status=GraphTemplateValidationStatus.PENDING,
validation_errors=[]
validation_errors=[],
retry_policy=body.retry_policy
).set_secrets(body.secrets)
)
except ValueError as e:
Expand All @@ -58,6 +60,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
validation_status=graph_template.validation_status,
validation_errors=graph_template.validation_errors,
secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()},
retry_policy=graph_template.retry_policy,
created_at=graph_template.created_at,
updated_at=graph_template.updated_at
)
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/models/db/graph_template_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..node_template_model import NodeTemplate
from app.utils.encrypter import get_encrypter
from app.models.dependent_string import DependentString

from app.models.retry_policy_model import RetryPolicyModel

class GraphTemplate(BaseDatabaseModel):
name: str = Field(..., description="Name of the graph")
Expand All @@ -20,6 +20,7 @@ class GraphTemplate(BaseDatabaseModel):
validation_status: GraphTemplateValidationStatus = Field(..., description="Validation status of the graph")
validation_errors: List[str] = Field(default_factory=list, description="Validation errors of the graph")
secrets: Dict[str, str] = Field(default_factory=dict, description="Secrets of the graph")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")

_node_by_identifier: Dict[str, NodeTemplate] | None = PrivateAttr(default=None)
_parents_by_identifier: Dict[str, set[str]] | None = PrivateAttr(default=None) # type: ignore
Expand Down
2 changes: 2 additions & 0 deletions state-manager/app/models/db/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class State(BaseDatabaseModel):
does_unites: bool = Field(default=False, description="Whether this state unites other states")
state_fingerprint: str = Field(default="", description="Fingerprint of the state")
enqueue_after: int = Field(default_factory=lambda: int(time.time() * 1000), gt=0, description="Unix time in milliseconds after which the state should be enqueued")
retry_count: int = Field(default=0, description="Number of times the state has been retried")
Comment thread
NiveditJain marked this conversation as resolved.

@before_event([Insert, Replace, Save])
def _generate_fingerprint(self):
Expand All @@ -37,6 +38,7 @@ def _generate_fingerprint(self):
"identifier": self.identifier,
"graph_name": self.graph_name,
"run_id": self.run_id,
"retry_count": self.retry_count,
"parents": {k: str(v) for k, v in self.parents.items()},
}
payload = json.dumps(
Expand Down
3 changes: 2 additions & 1 deletion state-manager/app/models/errored_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ class ErroredRequestModel(BaseModel):


class ErroredResponseModel(BaseModel):
status: StateStatusEnum = Field(..., description="Status of the state")
status: StateStatusEnum = Field(..., description="Status of the state")
retry_created: bool = Field(default=False, description="Whether a retry state was created")
3 changes: 3 additions & 0 deletions state-manager/app/models/graph_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
from typing import Dict, List, Optional
from datetime import datetime
from .graph_template_validation_status import GraphTemplateValidationStatus
from .retry_policy_model import RetryPolicyModel


class UpsertGraphTemplateRequest(BaseModel):
secrets: Dict[str, str] = Field(..., description="Dictionary of secrets that are used while graph execution")
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")


class UpsertGraphTemplateResponse(BaseModel):
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
secrets: Dict[str, bool] = Field(..., description="Dictionary of secrets that are used while graph execution")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
created_at: datetime = Field(..., description="Timestamp when the graph template was created")
updated_at: datetime = Field(..., description="Timestamp when the graph template was last updated")
validation_status: GraphTemplateValidationStatus = Field(..., description="Current validation status of the graph template")
Expand Down
59 changes: 59 additions & 0 deletions state-manager/app/models/retry_policy_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from pydantic import BaseModel, Field
from enum import Enum
import random

class RetryStrategy(str, Enum):
EXPONENTIAL = "EXPONENTIAL"
EXPONENTIAL_FULL_JITTER = "EXPONENTIAL_FULL_JITTER"
EXPONENTIAL_EQUAL_JITTER = "EXPONENTIAL_EQUAL_JITTER"

LINEAR = "LINEAR"
LINEAR_FULL_JITTER = "LINEAR_FULL_JITTER"
LINEAR_EQUAL_JITTER = "LINEAR_EQUAL_JITTER"

FIXED = "FIXED"
FIXED_FULL_JITTER = "FIXED_FULL_JITTER"
FIXED_EQUAL_JITTER = "FIXED_EQUAL_JITTER"

class RetryPolicyModel(BaseModel):
Comment thread
NiveditJain marked this conversation as resolved.
max_retries: int = Field(default=3, description="The maximum number of retries", ge=0)
strategy: RetryStrategy = Field(default=RetryStrategy.EXPONENTIAL, description="The method of retry")
backoff_factor: int = Field(default=2000, description="The backoff factor in milliseconds (default: 2000 = 2 seconds)", gt=0)
exponent: int = Field(default=2, description="The exponent for the exponential retry strategy", gt=0)

Comment thread
NiveditJain marked this conversation as resolved.
def compute_delay(self, retry_count: int) -> int:
Comment thread
NiveditJain marked this conversation as resolved.
if self.strategy == RetryStrategy.EXPONENTIAL:
return (self.backoff_factor * (self.exponent ** retry_count))

elif self.strategy == RetryStrategy.EXPONENTIAL_FULL_JITTER:
base = self.backoff_factor * (self.exponent ** retry_count)
return int(random.uniform(0, base))

elif self.strategy == RetryStrategy.EXPONENTIAL_EQUAL_JITTER:
base = self.backoff_factor * (self.exponent ** retry_count)
return int(base/2 + random.uniform(0, base / 2))

Comment thread
NiveditJain marked this conversation as resolved.
elif self.strategy == RetryStrategy.LINEAR:
return (self.backoff_factor * retry_count)

elif self.strategy == RetryStrategy.LINEAR_FULL_JITTER:
base = self.backoff_factor * retry_count
return int(random.uniform(0, base))

elif self.strategy == RetryStrategy.LINEAR_EQUAL_JITTER:
base = self.backoff_factor * retry_count
return int(base/2 + random.uniform(0, base / 2))

elif self.strategy == RetryStrategy.FIXED:
return self.backoff_factor

Comment thread
NiveditJain marked this conversation as resolved.
elif self.strategy == RetryStrategy.FIXED_FULL_JITTER:
base = self.backoff_factor
return int(random.uniform(0, base))

elif self.strategy == RetryStrategy.FIXED_EQUAL_JITTER:
base = self.backoff_factor
return int(base/2 + random.uniform(0, base / 2))

else:
raise Exception("Invalid retry strategy")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
Loading