Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Settings(BaseModel):
state_manager_secret: str = Field(..., description="Secret key for API authentication")
secrets_encryption_key: str = Field(..., description="Key for encrypting secrets")
trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron")
node_timeout_minutes: int = Field(default=30, gt=0, description="Timeout in minutes for nodes stuck in QUEUED status")
Comment thread
agam1092005 marked this conversation as resolved.
Outdated

@classmethod
def from_env(cls) -> "Settings":
Expand All @@ -21,7 +22,8 @@ def from_env(cls) -> "Settings":
mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore
state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore
secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)) # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore
Comment thread
agam1092005 marked this conversation as resolved.
Outdated
)


Expand Down
8 changes: 6 additions & 2 deletions state-manager/app/controller/enqueue_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@


async def find_state(namespace_name: str, nodes: list[str]) -> State | None:
current_time_ms = int(time.time() * 1000)
data = await State.get_pymongo_collection().find_one_and_update(
{
"namespace_name": namespace_name,
"status": StateStatusEnum.CREATED,
"node_name": {
"$in": nodes
},
"enqueue_after": {"$lte": int(time.time() * 1000)}
"enqueue_after": {"$lte": current_time_ms}
},
{
"$set": {"status": StateStatusEnum.QUEUED}
"$set": {
"status": StateStatusEnum.QUEUED,
"queued_at": current_time_ms
}
},
return_document=ReturnDocument.AFTER
)
Comment on lines 20 to 52
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query seems to be correct, however how will we repick timed-out states?

Expand Down
10 changes: 10 additions & 0 deletions state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .tasks.trigger_cron import trigger_cron
from .tasks.check_node_timeout import check_node_timeout

# Define models list
DOCUMENT_MODELS = [State, GraphTemplate, RegisteredNode, Store, Run, DatabaseTriggers]
Expand Down Expand Up @@ -76,6 +77,15 @@ async def lifespan(app: FastAPI):
max_instances=1,
id="every_minute_task"
)
scheduler.add_job(
check_node_timeout,
CronTrigger.from_crontab("* * * * *"),
replace_existing=True,
misfire_grace_time=60,
coalesce=True,
max_instances=1,
id="check_node_timeout_task"
)
Comment on lines +87 to +95
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed with db queries

scheduler.start()

# main logic of the server
Expand Down
8 changes: 8 additions & 0 deletions state-manager/app/models/db/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class State(BaseDatabaseModel):
retry_count: int = Field(default=0, description="Number of times the state has been retried")
fanout_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Fanout ID of the state")
manual_retry_fanout_id: str = Field(default="", description="Fanout ID from a manual retry request, ensuring unique retries for unite nodes.")
queued_at: Optional[int] = Field(None, description="Unix time in milliseconds when the state was queued")

@before_event([Insert, Replace, Save])
def _generate_fingerprint(self):
Expand Down Expand Up @@ -102,5 +103,12 @@ class Settings:
("status", 1),
],
name="run_id_status_index"
),
IndexModel(
[
("status", 1),
("queued_at", 1),
],
name="timeout_query_index"
)
Comment thread
agam1092005 marked this conversation as resolved.
]
1 change: 1 addition & 0 deletions state-manager/app/models/state_status_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class StateStatusEnum(str, Enum):
# Errored
ERRORED = 'ERRORED'
NEXT_CREATED_ERROR = 'NEXT_CREATED_ERROR'
TIMEDOUT = 'TIMEDOUT'

# Success
SUCCESS = 'SUCCESS'
Expand Down
36 changes: 36 additions & 0 deletions state-manager/app/tasks/check_node_timeout.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this model of periodic jobs will work, its unnecessary as we can write a database query to figure out timeout nodes, we probably do not need to set the status timeout just from if the status is Queued and current_time > timeout_at we can figure it.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time
from app.models.db.state import State
from app.models.state_status_enum import StateStatusEnum
from app.singletons.logs_manager import LogsManager
from app.config.settings import get_settings

logger = LogsManager().get_logger()


async def check_node_timeout():
try:
settings = get_settings()
timeout_ms = settings.node_timeout_minutes * 60 * 1000
current_time_ms = int(time.time() * 1000)
timeout_threshold = current_time_ms - timeout_ms

logger.info(f"Checking for timed out nodes with threshold: {timeout_threshold}")

result = await State.get_pymongo_collection().update_many(
{
"status": StateStatusEnum.QUEUED,
"queued_at": {"$ne": None, "$lte": timeout_threshold}
},
{
"$set": {
"status": StateStatusEnum.TIMEDOUT,
"error": f"Node execution timed out after {settings.node_timeout_minutes} minutes"
}
}
)
Comment on lines +16 to +27
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify retry/recovery mechanism for TIMEDOUT states.

The implementation correctly marks states as TIMEDOUT, but there's no visibility into how these states are subsequently handled. As noted in previous review comments, it's important to understand how timed-out states are re-picked or retried.

Run the following script to check if there's a retry mechanism for TIMEDOUT states:


🏁 Script executed:

#!/bin/bash
# Description: Search for retry/recovery logic for TIMEDOUT states

# Search for TIMEDOUT status handling
rg -n "TIMEDOUT" --type=py -C 5 | rg -i "retry|recover|manual"

# Search for state status transitions from TIMEDOUT
ast-grep --pattern $'StateStatusEnum.TIMEDOUT'

# Check if TIMEDOUT states can be manually retried or recovered
rg -n "manual_retry|retry_state" --type=py -A 10 | rg -i "timedout"

Length of output: 484


Implement retry or recovery for TIMEDOUT states. I didn’t find any logic that re-queues or retries tasks once their status is set to TIMEDOUT. Add a mechanism (e.g., re-pick policy, manual recovery endpoint, or scheduled retry) in state-manager/app/tasks/check_node_timeout.py to handle these states.

🤖 Prompt for AI Agents
In state-manager/app/tasks/check_node_timeout.py around lines 16 to 27, the code
marks timed-out states as TIMEDOUT but provides no recovery; implement an
automatic retry path: change the logic to (1) for timed-out docs with
retry_count < MAX_RETRIES, set status back to QUEUED, increment retry_count,
clear error and set a next_attempt_at or timeout_at to now + backoff_ms; (2) for
timed-out docs with retry_count >= MAX_RETRIES, set status to TIMEDOUT and leave
the error as final; (3) define MAX_RETRIES and backoff policy as config
constants and ensure updates are done atomically (use update_many with $inc and
$set filters or two distinct update_many calls filtered by retry_count) so tasks
are re-queued automatically while preventing infinite retries.


if result.modified_count > 0:
logger.info(f"Marked {result.modified_count} states as TIMEDOUT")

except Exception:
logger.error("Error checking node timeout", exc_info=True)
114 changes: 114 additions & 0 deletions state-manager/tests/unit/tasks/test_check_node_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pytest
import time
from unittest.mock import AsyncMock, MagicMock, patch
from app.models.state_status_enum import StateStatusEnum


class TestCheckNodeTimeout:

@pytest.mark.asyncio
async def test_check_node_timeout_marks_timed_out_states(self):
mock_collection = MagicMock()
mock_result = MagicMock()
mock_result.modified_count = 3
mock_collection.update_many = AsyncMock(return_value=mock_result)

with patch('app.tasks.check_node_timeout.State') as mock_state, \
patch('app.tasks.check_node_timeout.get_settings') as mock_get_settings:

from app.tasks.check_node_timeout import check_node_timeout

mock_settings = MagicMock()
mock_settings.node_timeout_minutes = 30
mock_get_settings.return_value = mock_settings

mock_state.get_pymongo_collection.return_value = mock_collection

await check_node_timeout()

mock_collection.update_many.assert_called_once()
call_args = mock_collection.update_many.call_args

query = call_args[0][0]
update = call_args[0][1]

assert query["status"] == StateStatusEnum.QUEUED
assert "$ne" in query["queued_at"]
assert "$lte" in query["queued_at"]

assert update["$set"]["status"] == StateStatusEnum.TIMEDOUT
assert "timed out after 30 minutes" in update["$set"]["error"]

@pytest.mark.asyncio
async def test_check_node_timeout_no_timed_out_states(self):
mock_collection = MagicMock()
mock_result = MagicMock()
mock_result.modified_count = 0
mock_collection.update_many = AsyncMock(return_value=mock_result)

with patch('app.tasks.check_node_timeout.State') as mock_state, \
patch('app.tasks.check_node_timeout.get_settings') as mock_get_settings:

from app.tasks.check_node_timeout import check_node_timeout

mock_settings = MagicMock()
mock_settings.node_timeout_minutes = 30
mock_get_settings.return_value = mock_settings

mock_state.get_pymongo_collection.return_value = mock_collection

await check_node_timeout()

mock_collection.update_many.assert_called_once()

@pytest.mark.asyncio
async def test_check_node_timeout_handles_exception(self):
mock_collection = MagicMock()
mock_collection.update_many = AsyncMock(side_effect=Exception("Database error"))

with patch('app.tasks.check_node_timeout.State') as mock_state, \
patch('app.tasks.check_node_timeout.get_settings') as mock_get_settings, \
patch('app.tasks.check_node_timeout.logger') as mock_logger:

from app.tasks.check_node_timeout import check_node_timeout

mock_settings = MagicMock()
mock_settings.node_timeout_minutes = 30
mock_get_settings.return_value = mock_settings

mock_state.get_pymongo_collection.return_value = mock_collection

await check_node_timeout()

mock_logger.error.assert_called_once()
error_message = mock_logger.error.call_args[0][0]
assert "Error checking node timeout" in error_message

@pytest.mark.asyncio
async def test_check_node_timeout_calculates_correct_threshold(self):
mock_collection = MagicMock()
mock_result = MagicMock()
mock_result.modified_count = 0
mock_collection.update_many = AsyncMock(return_value=mock_result)

with patch('app.tasks.check_node_timeout.State') as mock_state, \
patch('app.tasks.check_node_timeout.get_settings') as mock_get_settings, \
patch('app.tasks.check_node_timeout.time') as mock_time:

from app.tasks.check_node_timeout import check_node_timeout

mock_time.time.return_value = 1700000000

mock_settings = MagicMock()
mock_settings.node_timeout_minutes = 45
mock_get_settings.return_value = mock_settings

mock_state.get_pymongo_collection.return_value = mock_collection

await check_node_timeout()

call_args = mock_collection.update_many.call_args
query = call_args[0][0]

expected_threshold = (1700000000 * 1000) - (45 * 60 * 1000)
assert query["queued_at"]["$lte"] == expected_threshold
Loading