-
Notifications
You must be signed in to change notification settings - Fork 42
feat: add node-level timeouts to prevent stuck queued states #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
735cd6d
4303a3d
8634281
4ee52fd
4be13c9
9839bc6
de475b8
2c7f3c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,6 +13,7 @@ class Settings(BaseModel): | |||||||||
| state_manager_secret: str = Field(..., description="Secret key for API authentication") | ||||||||||
| secrets_encryption_key: str = Field(..., description="Key for encrypting secrets") | ||||||||||
| trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron") | ||||||||||
| node_timeout_minutes: int = Field(default=30, gt=0, description="Timeout in minutes for nodes stuck in QUEUED status") | ||||||||||
| trigger_retention_hours: int = Field(default=720, description="Number of hours to retain completed/failed triggers before cleanup") | ||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
|
|
@@ -22,6 +23,7 @@ def from_env(cls) -> "Settings": | |||||||||
| mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore | ||||||||||
| state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore | ||||||||||
| secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore | ||||||||||
| node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore | ||||||||||
| trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical syntax error: Missing comma after line 26. Line 26 is missing a trailing comma, which will cause a Python syntax error when parsing this constructor call. Additionally, line 27 uses explicit Apply this diff to fix both issues: - node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30") # type: ignore
- trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)), # type: ignore
+ node_timeout_minutes=os.getenv("NODE_TIMEOUT_MINUTES", "30"), # type: ignore
+ trigger_workers=os.getenv("TRIGGER_WORKERS", "1"), # type: ignore📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
| trigger_retention_hours=int(os.getenv("TRIGGER_RETENTION_HOURS", 720)) # type: ignore | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,24 +7,81 @@ | |
| from ..models.state_status_enum import StateStatusEnum | ||
|
|
||
| from app.singletons.logs_manager import LogsManager | ||
| from app.config.settings import get_settings | ||
| from pymongo import ReturnDocument | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
|
|
||
| async def find_state(namespace_name: str, nodes: list[str]) -> State | None: | ||
| current_time_ms = int(time.time() * 1000) | ||
| settings = get_settings() | ||
|
|
||
| # Use pipeline to calculate timeout_at based on state-specific or global timeout | ||
| pipeline = [ | ||
| { | ||
| "$match": { | ||
| "namespace_name": namespace_name, | ||
| "status": StateStatusEnum.CREATED, | ||
| "node_name": {"$in": nodes}, | ||
| "enqueue_after": {"$lte": current_time_ms} | ||
| } | ||
| }, | ||
| { | ||
| "$addFields": { | ||
| "status": StateStatusEnum.QUEUED, | ||
| "queued_at": current_time_ms, | ||
| "timeout_at": { | ||
| "$add": [ | ||
| current_time_ms, | ||
| { | ||
| "$multiply": [ | ||
| { | ||
| "$ifNull": [ | ||
| "$timeout_minutes", | ||
| settings.node_timeout_minutes | ||
| ] | ||
| }, | ||
| 60000 # Convert minutes to milliseconds | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| ] | ||
|
|
||
| data = await State.get_pymongo_collection().find_one_and_update( | ||
| { | ||
| "namespace_name": namespace_name, | ||
| "status": StateStatusEnum.CREATED, | ||
| "node_name": { | ||
| "$in": nodes | ||
| }, | ||
| "enqueue_after": {"$lte": int(time.time() * 1000)} | ||
| }, | ||
| { | ||
| "$set": {"status": StateStatusEnum.QUEUED} | ||
| "node_name": {"$in": nodes}, | ||
| "enqueue_after": {"$lte": current_time_ms} | ||
| }, | ||
| [ | ||
| { | ||
| "$set": { | ||
| "status": StateStatusEnum.QUEUED, | ||
| "queued_at": current_time_ms, | ||
| "timeout_at": { | ||
| "$add": [ | ||
| current_time_ms, | ||
| { | ||
| "$multiply": [ | ||
| { | ||
| "$ifNull": [ | ||
| "$timeout_minutes", | ||
| settings.node_timeout_minutes | ||
| ] | ||
| }, | ||
| 60000 # Convert minutes to milliseconds | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| ], | ||
| return_document=ReturnDocument.AFTER | ||
| ) | ||
|
Comment on lines
20
to
52
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The query seems to be correct, however how will we repick timed-out states? |
||
| return State(**data) if data else None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | ||
| from apscheduler.triggers.cron import CronTrigger | ||
| from .tasks.trigger_cron import trigger_cron | ||
| from .tasks.check_node_timeout import check_node_timeout | ||
|
|
||
| # init tasks | ||
| from .tasks.init_tasks import init_tasks | ||
|
|
@@ -83,6 +84,15 @@ async def lifespan(app: FastAPI): | |
| max_instances=1, | ||
| id="every_minute_task" | ||
| ) | ||
| scheduler.add_job( | ||
| check_node_timeout, | ||
| CronTrigger.from_crontab("* * * * *"), | ||
| replace_existing=True, | ||
| misfire_grace_time=60, | ||
| coalesce=True, | ||
| max_instances=1, | ||
| id="check_node_timeout_task" | ||
| ) | ||
|
Comment on lines
+87
to
+95
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed with db queries |
||
| scheduler.start() | ||
|
|
||
| # main logic of the server | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while this model of periodic jobs will work, its unnecessary as we can write a database query to figure out timeout nodes, we probably do not need to set the status timeout just from if the status is Queued and current_time > timeout_at we can figure it. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import time | ||
| from app.models.db.state import State | ||
| from app.models.state_status_enum import StateStatusEnum | ||
| from app.singletons.logs_manager import LogsManager | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
|
|
||
| async def check_node_timeout(): | ||
| try: | ||
| current_time_ms = int(time.time() * 1000) | ||
|
|
||
| logger.info(f"Checking for timed out nodes at {current_time_ms}") | ||
|
|
||
| # Use database query to find and update timed out states in one operation | ||
| result = await State.get_pymongo_collection().update_many( | ||
| { | ||
| "status": StateStatusEnum.QUEUED, | ||
| "timeout_at": {"$ne": None, "$lte": current_time_ms} | ||
| }, | ||
| { | ||
| "$set": { | ||
| "status": StateStatusEnum.TIMEDOUT, | ||
| "error": "Node execution timed out" | ||
| } | ||
| } | ||
| ) | ||
|
Comment on lines
+16
to
+27
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify retry/recovery mechanism for TIMEDOUT states. The implementation correctly marks states as Run the following script to check if there's a retry mechanism for TIMEDOUT states: 🏁 Script executed: #!/bin/bash
# Description: Search for retry/recovery logic for TIMEDOUT states
# Search for TIMEDOUT status handling
rg -n "TIMEDOUT" --type=py -C 5 | rg -i "retry|recover|manual"
# Search for state status transitions from TIMEDOUT
ast-grep --pattern $'StateStatusEnum.TIMEDOUT'
# Check if TIMEDOUT states can be manually retried or recovered
rg -n "manual_retry|retry_state" --type=py -A 10 | rg -i "timedout"Length of output: 484 Implement retry or recovery for TIMEDOUT states. I didn’t find any logic that re-queues or retries tasks once their status is set to 🤖 Prompt for AI Agents |
||
|
|
||
| if result.modified_count > 0: | ||
| logger.info(f"Marked {result.modified_count} states as TIMEDOUT") | ||
|
|
||
| except Exception: | ||
| logger.error("Error checking node timeout", exc_info=True) | ||
Uh oh!
There was an error while loading. Please reload this page.