Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions state-manager/app/controller/cancel_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Controller for cancelling pending triggers for a graph
"""
import asyncio
from app.models.cancel_trigger_models import CancelTriggerResponse
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum
from app.singletons.logs_manager import LogsManager
from app.config.settings import get_settings
from app.tasks.trigger_cron import mark_as_cancelled
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will not recommend importing from tasks in controller. This might lead to confusing dependencies going forward.

from beanie.operators import In

logger = LogsManager().get_logger()

async def cancel_triggers(namespace_name: str, graph_name: str, x_exosphere_request_id: str) -> CancelTriggerResponse:
"""
Cancel all pending or triggering triggers for a specific graph

Args:
namespace_name: The namespace of the graph
graph_name: The name of the graph
x_exosphere_request_id: Request ID for logging

Returns:
CancelTriggerResponse with cancellation details
"""
try:
logger.info(f"Request to cancel triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

# Find all PENDING or TRIGGERING triggers for this graph
triggers = await DatabaseTriggers.find(
DatabaseTriggers.namespace == namespace_name,
DatabaseTriggers.graph_name == graph_name,
In(DatabaseTriggers.trigger_status, [TriggerStatusEnum.PENDING, TriggerStatusEnum.TRIGGERING])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a Database trigger is in TRIGGERING state, I don't think we should cancel in that state. Why? Because it might be immediately picked by the process to move to TRIGGERED. This will create an inconsistent state where actually the job is Triggered but it shows TRIGGERING.

).to_list()

if not triggers:
logger.info(f"No pending triggers found for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
return CancelTriggerResponse(
namespace=namespace_name,
graph_name=graph_name,
cancelled_count=0,
message="No pending triggers found to cancel"
)

# Get retention hours from settings
settings = get_settings()
retention_hours = settings.trigger_retention_hours
Comment on lines +46 to +48
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this here?


# Cancel each trigger concurrently
cancelled_count = len(triggers)
cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers]
if cancellation_tasks:
await asyncio.gather(*cancellation_tasks)
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

logger.info(f"Cancelled {cancelled_count} triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

return CancelTriggerResponse(
namespace=namespace_name,
graph_name=graph_name,
cancelled_count=cancelled_count,
message=f"Successfully cancelled {cancelled_count} trigger(s)"
)

except Exception as e:
logger.error(f"Error cancelling triggers for graph {graph_name} in namespace {namespace_name}: {str(e)}", x_exosphere_request_id=x_exosphere_request_id)
raise

9 changes: 9 additions & 0 deletions state-manager/app/models/cancel_trigger_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic import BaseModel, Field


class CancelTriggerResponse(BaseModel):
namespace: str = Field(..., description="Namespace of the cancelled triggers")
graph_name: str = Field(..., description="Name of the graph")
cancelled_count: int = Field(..., description="Number of triggers that were cancelled")
message: str = Field(..., description="Human-readable message describing the result")

3 changes: 2 additions & 1 deletion state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Settings:
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED,
TriggerStatusEnum.FAILED
TriggerStatusEnum.FAILED,
TriggerStatusEnum.CANCELLED
]
}
}
Expand Down
23 changes: 23 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
from .models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from .controller.manual_retry_state import manual_retry_state

# cancel_triggers
from .models.cancel_trigger_models import CancelTriggerResponse
from .controller.cancel_triggers import cancel_triggers


logger = LogsManager().get_logger()

Expand Down Expand Up @@ -237,6 +241,25 @@ async def get_graph_template(namespace_name: str, graph_name: str, request: Requ
return await get_graph_template_controller(namespace_name, graph_name, x_exosphere_request_id)


@router.delete(
"/graph/{graph_name}/triggers",
response_model=CancelTriggerResponse,
status_code=status.HTTP_200_OK,
response_description="Triggers cancelled successfully",
tags=["graph"]
)
async def cancel_triggers_route(namespace_name: str, graph_name: str, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
Comment thread
NiveditJain marked this conversation as resolved.

return await cancel_triggers(namespace_name, graph_name, x_exosphere_request_id)


@router.put(
"/nodes/",
response_model=RegisterNodesResponseModel,
Expand Down
13 changes: 12 additions & 1 deletion state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,15 @@ async def trigger_cron():
cron_time = datetime.now()
settings = get_settings()
logger.info(f"starting trigger_cron: {cron_time}")
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])
await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)])

async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int):
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED,
"expires_at": expires_at
}}
)
Comment on lines +103 to +112
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this here?

Loading