-
Notifications
You must be signed in to change notification settings - Fork 43
Enhancement/cancel triggers #505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
ba9f69e
4c478eb
14097f6
569e31f
a94133d
03c460f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| """ | ||
| Controller for cancelling pending triggers for a graph | ||
| """ | ||
| import asyncio | ||
| from app.models.cancel_trigger_models import CancelTriggerResponse | ||
| from app.models.db.trigger import DatabaseTriggers | ||
| from app.models.trigger_models import TriggerStatusEnum | ||
| from app.singletons.logs_manager import LogsManager | ||
| from app.config.settings import get_settings | ||
| from app.tasks.trigger_cron import mark_as_cancelled | ||
| from beanie.operators import In | ||
|
|
||
| logger = LogsManager().get_logger() | ||
|
|
||
| async def cancel_triggers(namespace_name: str, graph_name: str, x_exosphere_request_id: str) -> CancelTriggerResponse: | ||
| """ | ||
| Cancel all pending or triggering triggers for a specific graph | ||
|
|
||
| Args: | ||
| namespace_name: The namespace of the graph | ||
| graph_name: The name of the graph | ||
| x_exosphere_request_id: Request ID for logging | ||
|
|
||
| Returns: | ||
| CancelTriggerResponse with cancellation details | ||
| """ | ||
| try: | ||
| logger.info(f"Request to cancel triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
|
|
||
| # Find all PENDING or TRIGGERING triggers for this graph | ||
| triggers = await DatabaseTriggers.find( | ||
| DatabaseTriggers.namespace == namespace_name, | ||
| DatabaseTriggers.graph_name == graph_name, | ||
| In(DatabaseTriggers.trigger_status, [TriggerStatusEnum.PENDING, TriggerStatusEnum.TRIGGERING]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a Database trigger is in TRIGGERING state, I don't think we should cancel in that state. Why? Because it might be immediately picked by the process to move to TRIGGERED. This will create an inconsistent state where actually the job is Triggered but it shows TRIGGERING. |
||
| ).to_list() | ||
|
|
||
| if not triggers: | ||
| logger.info(f"No pending triggers found for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
| return CancelTriggerResponse( | ||
| namespace=namespace_name, | ||
| graph_name=graph_name, | ||
| cancelled_count=0, | ||
| message="No pending triggers found to cancel" | ||
| ) | ||
|
|
||
| # Get retention hours from settings | ||
| settings = get_settings() | ||
| retention_hours = settings.trigger_retention_hours | ||
|
Comment on lines
+46
to
+48
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this here? |
||
|
|
||
| # Cancel each trigger concurrently | ||
| cancelled_count = len(triggers) | ||
| cancellation_tasks = [mark_as_cancelled(trigger, retention_hours) for trigger in triggers] | ||
| if cancellation_tasks: | ||
| await asyncio.gather(*cancellation_tasks) | ||
|
NiveditJain marked this conversation as resolved.
Outdated
|
||
|
|
||
| logger.info(f"Cancelled {cancelled_count} triggers for graph {graph_name} in namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) | ||
|
|
||
| return CancelTriggerResponse( | ||
| namespace=namespace_name, | ||
| graph_name=graph_name, | ||
| cancelled_count=cancelled_count, | ||
| message=f"Successfully cancelled {cancelled_count} trigger(s)" | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error cancelling triggers for graph {graph_name} in namespace {namespace_name}: {str(e)}", x_exosphere_request_id=x_exosphere_request_id) | ||
| raise | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from pydantic import BaseModel, Field | ||
|
|
||
|
|
||
| class CancelTriggerResponse(BaseModel): | ||
| namespace: str = Field(..., description="Namespace of the cancelled triggers") | ||
| graph_name: str = Field(..., description="Name of the graph") | ||
| cancelled_count: int = Field(..., description="Number of triggers that were cancelled") | ||
| message: str = Field(..., description="Human-readable message describing the result") | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,4 +98,15 @@ async def trigger_cron(): | |
| cron_time = datetime.now() | ||
| settings = get_settings() | ||
| logger.info(f"starting trigger_cron: {cron_time}") | ||
| await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)]) | ||
| await asyncio.gather(*[handle_trigger(cron_time, settings.trigger_retention_hours) for _ in range(settings.trigger_workers)]) | ||
|
|
||
| async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int): | ||
| expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours) | ||
|
|
||
| await DatabaseTriggers.get_pymongo_collection().update_one( | ||
| {"_id": trigger.id}, | ||
| {"$set": { | ||
| "trigger_status": TriggerStatusEnum.CANCELLED, | ||
| "expires_at": expires_at | ||
| }} | ||
| ) | ||
|
Comment on lines
+103
to
+112
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this here? |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will not recommend importing from tasks in controller. This might lead to confusing dependencies going forward.