Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class Settings(BaseModel):
"""Application settings loaded from environment variables."""

# MongoDB Configuration
mongo_uri: str = Field(..., description="MongoDB connection URI" )
mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name")
Expand Down
102 changes: 76 additions & 26 deletions state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
from datetime import datetime, timedelta, timezone

from app.singletons.logs_manager import LogsManager
from app.models.graph_models import UpsertGraphTemplateRequest, UpsertGraphTemplateResponse
from app.models.db.graph_template_model import GraphTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
from app.tasks.verify_graph import verify_graph
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
from beanie.operators import In
from app.config.settings import get_settings

from fastapi import BackgroundTasks, HTTPException

logger = LogsManager().get_logger()
settings = get_settings()
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
Outdated

async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse:
try:

async def upsert_graph_template(
namespace_name: str,
graph_name: str,
body: UpsertGraphTemplateRequest,
x_exosphere_request_id: str,
background_tasks: BackgroundTasks,
) -> UpsertGraphTemplateResponse:
try:
old_triggers = []

graph_template = await GraphTemplate.find_one(
GraphTemplate.name == graph_name,
GraphTemplate.namespace == namespace_name
GraphTemplate.namespace == namespace_name,
)

try:
if graph_template:
logger.info(
"Graph template already exists in namespace", graph_template=graph_template,
"Graph template already exists in namespace",
graph_template=graph_template,
namespace_name=namespace_name,
x_exosphere_request_id=x_exosphere_request_id)
x_exosphere_request_id=x_exosphere_request_id,
)
old_triggers = graph_template.triggers

graph_template.set_secrets(body.secrets)
graph_template.validation_status = GraphTemplateValidationStatus.PENDING
graph_template.validation_errors = []
Expand All @@ -37,14 +48,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
graph_template.nodes = body.nodes
graph_template.triggers = body.triggers
await graph_template.save()

else:
logger.info(
"Graph template does not exist in namespace",
namespace_name=namespace_name,
graph_name=graph_name,
x_exosphere_request_id=x_exosphere_request_id)

x_exosphere_request_id=x_exosphere_request_id,
)

graph_template = await GraphTemplate.insert(
GraphTemplate(
name=graph_name,
Expand All @@ -54,35 +66,73 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
validation_errors=[],
retry_policy=body.retry_policy,
store_config=body.store_config,
triggers=body.triggers
triggers=body.triggers,
).set_secrets(body.secrets)
)
except ValueError as e:
logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}")

logger.error(
"Error validating graph template",
error=e,
x_exosphere_request_id=x_exosphere_request_id,
)
raise HTTPException(
status_code=400,
detail=f"Error validating graph template: {str(e)}",
)

# Previously:
# await DatabaseTriggers.find(...).delete_many()
#
# Now: bulk update to mark matching CRON triggers as CANCELLED
# and set expires_at so TTL can clean them up later.
if len(old_triggers) > 0:
await DatabaseTriggers.find(
DatabaseTriggers.graph_name == graph_name,
DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
DatabaseTriggers.type == TriggerTypeEnum.CRON,
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
).delete_many()
cron_expressions = [
trigger.value["expression"]
for trigger in old_triggers
if trigger.type == TriggerTypeEnum.CRON
]

if cron_expressions:
expires_at = datetime.now(timezone.utc) + timedelta(
hours=settings.trigger_retention_hours
)

await DatabaseTriggers.get_pymongo_collection().update_many(
{
"graph_name": graph_name,
"trigger_status": TriggerStatusEnum.PENDING.value,
"type": TriggerTypeEnum.CRON.value,
"expression": {"$in": cron_expressions},
},
{
"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED.value,
"expires_at": expires_at,
}
},
)
Comment thread
Brijesh-Thakkar marked this conversation as resolved.

background_tasks.add_task(verify_graph, graph_template)

return UpsertGraphTemplateResponse(
nodes=graph_template.nodes,
validation_status=graph_template.validation_status,
validation_errors=graph_template.validation_errors,
secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()},
secrets={
secret_name: True
for secret_name in graph_template.get_secrets().keys()
},
retry_policy=graph_template.retry_policy,
store_config=graph_template.store_config,
triggers=graph_template.triggers,
created_at=graph_template.created_at,
updated_at=graph_template.updated_at
updated_at=graph_template.updated_at,
)

except Exception as e:
logger.error("Error upserting graph template", error=e, x_exosphere_request_id=x_exosphere_request_id)
raise e
logger.error(
"Error upserting graph template",
error=e,
x_exosphere_request_id=x_exosphere_request_id,
)
raise e
3 changes: 2 additions & 1 deletion state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Settings:
"trigger_status": {
"$in": [
TriggerStatusEnum.TRIGGERED,
TriggerStatusEnum.FAILED
TriggerStatusEnum.FAILED,
TriggerStatusEnum.CANCELLED
]
}
}
Expand Down
45 changes: 36 additions & 9 deletions state-manager/app/tasks/init_tasks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
# tasks to run when the server starts
from datetime import datetime, timedelta, timezone
import asyncio

from app.config.settings import get_settings
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum
import asyncio
from app.singletons.logs_manager import LogsManager

logger = LogsManager().get_logger()


async def delete_old_triggers():
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
Outdated
await DatabaseTriggers.get_pymongo_collection().delete_many(
settings = get_settings()
retention_hours = settings.trigger_retention_hours
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

# Use the same filter used before by delete_many()
filter_query = {
"trigger_status": {
"$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED]
},
"expires_at": None
}

logger.info(
f"Init task marking triggers CANCELLED for filter={filter_query}, "
f"expires_at={expires_at.isoformat()}"
)

await DatabaseTriggers.get_pymongo_collection().update_many(
filter_query,
{
"trigger_status": {
"$in": [TriggerStatusEnum.TRIGGERED, TriggerStatusEnum.FAILED]
},
"expires_at": None
}
"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED,
"expires_at": expires_at,
}
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
},
)
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
Outdated


async def init_tasks():
await asyncio.gather(
*[
delete_old_triggers()
])
delete_old_triggers(),
]
)
16 changes: 16 additions & 0 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ async def mark_as_failed(trigger: DatabaseTriggers, retention_hours: int):
}}
)

async def mark_as_cancelled(trigger: DatabaseTriggers, retention_hours: int):
"""
Mark a trigger as CANCELLED and set expires_at so MongoDB TTL will remove it
after `retention_hours`.
"""
expires_at = datetime.now(timezone.utc) + timedelta(hours=retention_hours)

await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {
"trigger_status": TriggerStatusEnum.CANCELLED,
"expires_at": expires_at
}}
Comment thread
Brijesh-Thakkar marked this conversation as resolved.
)
Comment thread
Brijesh-Thakkar marked this conversation as resolved.


Comment thread
Brijesh-Thakkar marked this conversation as resolved.
async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime, retention_hours: int):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, trigger.trigger_time)
Expand Down