Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c9d18d7
Add prune and re-enqueue signal functionality
NiveditJain Aug 30, 2025
e6367af
Fix typos in logging messages and comments
NiveditJain Aug 30, 2025
bd0a898
Update state-manager/app/models/signal_models.py
NiveditJain Aug 30, 2025
fc9c14c
Update state-manager/app/controller/re_queue_after_singal.py
NiveditJain Aug 30, 2025
4cc63fe
Update state-manager/app/controller/re_queue_after_singal.py
NiveditJain Aug 30, 2025
b6127ff
Update state-manager/app/controller/prune_signal.py
NiveditJain Aug 30, 2025
87870b0
Refactor re-queue after signal functionality and update state model
NiveditJain Aug 30, 2025
94d740f
Merge branch 'signals' of https://github.com/NiveditJain/exospherehos…
NiveditJain Aug 30, 2025
c689a8e
Added import for time module in re_queue_after_signal.py to support t…
NiveditJain Aug 30, 2025
55a4754
Add unit tests for prune and re-enqueue signal functionality
NiveditJain Aug 30, 2025
e5d61d8
Refactor test imports for prune and re-enqueue signal unit tests
NiveditJain Aug 30, 2025
d84b6ab
Implement prune and requeue signal functionality
NiveditJain Aug 30, 2025
ced39b9
Add tests for PruneSingal and ReQueueAfterSingal functionality
NiveditJain Aug 30, 2025
b018b03
Fix signal naming inconsistencies and enhance signal functionality
NiveditJain Aug 30, 2025
5d66297
Correct signal naming in tests and enhance exception handling
NiveditJain Aug 30, 2025
ef84f0c
Enhance validation in ReEnqueueAfterRequestModel tests
NiveditJain Aug 30, 2025
74f51a6
Add Signals documentation and update navigation in mkdocs.yml
NiveditJain Aug 30, 2025
c13b530
Update prune_signal status check to validate against QUEUED state
NiveditJain Aug 30, 2025
49e2eb4
fixed all failing tests
NiveditJain Aug 30, 2025
ea760a1
namespace check would be added as a seprate unit later to take care o…
NiveditJain Aug 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion state-manager/app/controller/enqueue_states.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time

from ..models.enqueue_request import EnqueueRequestModel
from ..models.enqueue_response import EnqueueResponseModel, StateModel
Expand All @@ -18,7 +19,8 @@ async def find_state(namespace_name: str, nodes: list[str]) -> State | None:
"status": StateStatusEnum.CREATED,
"node_name": {
"$in": nodes
}
},
"enqueue_after": {"$lte": int(time.time() * 1000)}
Comment thread
NiveditJain marked this conversation as resolved.
},
{
"$set": {"status": StateStatusEnum.QUEUED}
Expand Down
32 changes: 32 additions & 0 deletions state-manager/app/controller/prune_signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from app.models.signal_models import PruneRequestModel, SignalResponseModel
from fastapi import HTTPException, status
from beanie import PydanticObjectId

from app.models.db.state import State
from app.models.state_status_enum import StateStatusEnum
from app.singletons.logs_manager import LogsManager

logger = LogsManager().get_logger()

async def prune_signal(namespace_name: str, state_id: PydanticObjectId, body: PruneRequestModel, x_exosphere_request_id: str) -> SignalResponseModel:
Comment thread
NiveditJain marked this conversation as resolved.

try:
logger.info(f"Recieved prune signal for state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

state = await State.find_one(State.id == state_id)
Comment thread
NiveditJain marked this conversation as resolved.

if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")

if state.status != StateStatusEnum.CREATED:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is not created")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

Comment thread
NiveditJain marked this conversation as resolved.
state.status = StateStatusEnum.PRUNED
state.data = body.data
Comment thread
NiveditJain marked this conversation as resolved.
await state.save()

return SignalResponseModel(status=StateStatusEnum.PRUNED, enqueue_after=state.enqueue_after)

Comment thread
NiveditJain marked this conversation as resolved.
Outdated
except Exception as e:
logger.error(f"Error pruning state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e)
raise
Comment thread
NiveditJain marked this conversation as resolved.
32 changes: 32 additions & 0 deletions state-manager/app/controller/re_queue_after_singal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from app.models.signal_models import ReEnqueueAfterRequestModel, SignalResponseModel
from fastapi import HTTPException, status
from beanie import PydanticObjectId

from app.models.db.state import State
from app.models.state_status_enum import StateStatusEnum
from app.singletons.logs_manager import LogsManager

logger = LogsManager().get_logger()

async def re_queue_after_signal(namespace_name: str, state_id: PydanticObjectId, body: ReEnqueueAfterRequestModel, x_exosphere_request_id: str) -> SignalResponseModel:
Comment thread
NiveditJain marked this conversation as resolved.

try:
logger.info(f"Recieved re-queue after signal for state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

state = await State.find_one(State.id == state_id)
Comment thread
NiveditJain marked this conversation as resolved.

if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")

if state.status != StateStatusEnum.CREATED:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is not created")

state.status = StateStatusEnum.CREATED
Comment thread
NiveditJain marked this conversation as resolved.
state.enqueue_after = state.enqueue_after + body.enqueue_after
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
await state.save()

return SignalResponseModel(status=StateStatusEnum.CREATED, enqueue_after=state.enqueue_after)

except Exception as e:
logger.error(f"Error re-queueing state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e)
raise
28 changes: 27 additions & 1 deletion state-manager/app/models/db/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any, Optional
import hashlib
import json

import time

class State(BaseDatabaseModel):
node_name: str = Field(..., description="Name of the node of the state")
Expand All @@ -18,10 +18,12 @@ class State(BaseDatabaseModel):
status: StateStatusEnum = Field(..., description="Status of the state")
inputs: dict[str, Any] = Field(..., description="Inputs of the state")
outputs: dict[str, Any] = Field(..., description="Outputs of the state")
data: dict[str, Any] = Field(default_factory=dict, description="Data of the state")
error: Optional[str] = Field(None, description="Error message")
parents: dict[str, PydanticObjectId] = Field(default_factory=dict, description="Parents of the state")
does_unites: bool = Field(default=False, description="Whether this state unites other states")
state_fingerprint: str = Field(default="", description="Fingerprint of the state")
enqueue_after: int = Field(default_factory=lambda: int(time.time() * 1000), description="Unix time in milliseconds after which the state should be enqueued")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

@before_event([Insert, Replace, Save])
def _generate_fingerprint(self):
Expand Down Expand Up @@ -65,5 +67,29 @@ class Settings:
partialFilterExpression={
"does_unites": True
}
),
IndexModel(
[
("enqueue_after", 1)
],
name="idx_enqueue_after"
),
IndexModel(
[
("status", 1)
],
name="idx_status"
),
IndexModel(
[
("namespace_name", 1),
],
name="idx_namespace_name"
),
IndexModel(
[
("node_name", 1),
],
name="idx_node_name"
)
Comment thread
NiveditJain marked this conversation as resolved.
]
14 changes: 14 additions & 0 deletions state-manager/app/models/signal_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pydantic import BaseModel, Field
from .state_status_enum import StateStatusEnum
from typing import Any


class SignalResponseModel(BaseModel):
Comment thread
NiveditJain marked this conversation as resolved.
enqueue_after: int = Field(..., description="Unix time in milliseconds after which the state should be re-enqueued")
status: StateStatusEnum = Field(..., description="Status of the state")
Comment thread
NiveditJain marked this conversation as resolved.

Comment thread
NiveditJain marked this conversation as resolved.
class PruneRequestModel(BaseModel):
data: dict[str, Any] = Field(..., description="Data of the state")

Comment thread
NiveditJain marked this conversation as resolved.
class ReEnqueueAfterRequestModel(BaseModel):
enqueue_after: int = Field(..., description="Unix time in milliseconds after which the state should be re-enqueued")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
4 changes: 1 addition & 3 deletions state-manager/app/models/state_status_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ class StateStatusEnum(str, Enum):
CREATED = 'CREATED'
QUEUED = 'QUEUED'
EXECUTED = 'EXECUTED'
NEXT_CREATED = 'NEXT_CREATED'
RETRY_CREATED = 'RETRY_CREATED'
TIMEDOUT = 'TIMEDOUT'
ERRORED = 'ERRORED'
CANCELLED = 'CANCELLED'
SUCCESS = 'SUCCESS'
NEXT_CREATED_ERROR = 'NEXT_CREATED_ERROR'
PRUNED = 'PRUNED'
Comment thread
NiveditJain marked this conversation as resolved.
46 changes: 46 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
from .models.graph_structure_models import GraphStructureResponse
from .controller.get_graph_structure import get_graph_structure

### singnals
from .models.signal_models import SignalResponseModel
from .models.signal_models import PruneRequestModel
from .controller.prune_signal import prune_signal
from .models.signal_models import ReEnqueueAfterRequestModel
from .controller.re_queue_after_singal import re_queue_after_signal

Comment thread
NiveditJain marked this conversation as resolved.

logger = LogsManager().get_logger()

router = APIRouter(prefix="/v0/namespace/{namespace_name}")
Expand Down Expand Up @@ -145,6 +153,44 @@ async def errored_state_route(namespace_name: str, state_id: str, body: ErroredR
return await errored_state(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)


@router.post(
"/states/{state_id}/prune",
response_model=SignalResponseModel,
status_code=status.HTTP_200_OK,
response_description="State skipped successfully",
tags=["state"]
)
Comment thread
NiveditJain marked this conversation as resolved.
async def prune_state_route(namespace_name: str, state_id: str, body: PruneRequestModel, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
Comment thread
NiveditJain marked this conversation as resolved.

return await prune_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)

Comment thread
NiveditJain marked this conversation as resolved.

@router.post(
"/states/{state_id}/re-enqueue-after",
Comment thread
NiveditJain marked this conversation as resolved.
response_model=SignalResponseModel,
status_code=status.HTTP_200_OK,
response_description="State re-enqueued successfully",
tags=["state"]
)
async def re_enqueue_after_state_route(namespace_name: str, state_id: str, body: ReEnqueueAfterRequestModel, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
Comment thread
NiveditJain marked this conversation as resolved.

return await re_queue_after_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)
Comment thread
NiveditJain marked this conversation as resolved.


@router.put(
"/graph/{graph_name}",
response_model=UpsertGraphTemplateResponse,
Expand Down
Loading