Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 0 additions & 93 deletions state-manager/app/controller/create_states.py

This file was deleted.

80 changes: 80 additions & 0 deletions state-manager/app/controller/trigger_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from fastapi import HTTPException

from app.singletons.logs_manager import LogsManager
from app.models.trigger_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from app.models.state_status_enum import StateStatusEnum
from app.models.db.state import State
from app.models.db.store import Store
from app.models.db.graph_template_model import GraphTemplate
from app.models.node_template_model import NodeTemplate
import uuid

logger = LogsManager().get_logger()

def check_required_store_keys(graph_template: GraphTemplate, store: dict[str, str]) -> None:
required_keys = set(graph_template.store_config.required_keys)
provided_keys = set(store.keys())

missing_keys = required_keys - provided_keys
if missing_keys:
raise HTTPException(status_code=400, detail=f"Missing store keys: {missing_keys}")

Comment thread
NiveditJain marked this conversation as resolved.
Comment thread
NiveditJain marked this conversation as resolved.

def construct_inputs(node: NodeTemplate, inputs: dict[str, str]) -> dict[str, str]:
return {key: inputs.get(key, value) for key, value in node.inputs.items()}

Comment thread
NiveditJain marked this conversation as resolved.

async def trigger_graph(namespace_name: str, graph_name: str, body: TriggerGraphRequestModel, x_exosphere_request_id: str) -> TriggerGraphResponseModel:
try:
run_id = str(uuid.uuid4())
logger.info(f"Triggering graph {graph_name} with run_id {run_id}", x_exosphere_request_id=x_exosphere_request_id)

try:
graph_template = await GraphTemplate.get(namespace_name, graph_name)
except ValueError as e:
logger.error(f"Graph template not found for namespace {namespace_name} and graph {graph_name}", x_exosphere_request_id=x_exosphere_request_id)
if "Graph template not found" in str(e):
raise HTTPException(status_code=404, detail=f"Graph template not found for namespace {namespace_name} and graph {graph_name}")
else:
raise e
Comment thread
NiveditJain marked this conversation as resolved.

if not graph_template.is_valid():
raise HTTPException(status_code=400, detail=f"Graph template is not valid")

check_required_store_keys(graph_template, body.store)

new_stores = [
Store(
run_id=run_id,
namespace=namespace_name,
graph_name=graph_name,
key=key,
value=value
) for key, value in body.store.items()
]

await Store.insert_many(new_stores)

root = graph_template.get_root_node()

new_state = State(
node_name=root.node_name,
namespace_name=namespace_name,
identifier=root.identifier,
graph_name=graph_name,
run_id=run_id,
status=StateStatusEnum.CREATED,
inputs=construct_inputs(root, body.inputs),
outputs={},
error=None
)
await new_state.insert()

return TriggerGraphResponseModel(
status=StateStatusEnum.CREATED,
run_id=run_id
)

except Exception as e:
logger.error(f"Error triggering graph {graph_name} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise e
Comment thread
NiveditJain marked this conversation as resolved.
3 changes: 2 additions & 1 deletion state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .models.db.state import State
from .models.db.graph_template_model import GraphTemplate
from .models.db.registered_node import RegisteredNode
from .models.db.store import Store

# injecting routes
from .routes import router
Expand All @@ -41,7 +42,7 @@ async def lifespan(app: FastAPI):
# initializing beanie
client = AsyncMongoClient(settings.mongo_uri)
db = client[settings.mongo_database_name]
await init_beanie(db, document_models=[State, GraphTemplate, RegisteredNode])
await init_beanie(db, document_models=[State, GraphTemplate, RegisteredNode, Store])
logger.info("beanie dbs initialized")

# initialize secret
Expand Down
39 changes: 0 additions & 39 deletions state-manager/app/models/create_models.py

This file was deleted.

36 changes: 27 additions & 9 deletions state-manager/app/models/db/graph_template_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.utils.encrypter import get_encrypter
from app.models.dependent_string import DependentString
from app.models.retry_policy_model import RetryPolicyModel
from app.models.store_config_model import StoreConfig

class GraphTemplate(BaseDatabaseModel):
name: str = Field(..., description="Name of the graph")
Expand All @@ -21,6 +22,7 @@ class GraphTemplate(BaseDatabaseModel):
validation_errors: List[str] = Field(default_factory=list, description="Validation errors of the graph")
secrets: Dict[str, str] = Field(default_factory=dict, description="Secrets of the graph")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")

_node_by_identifier: Dict[str, NodeTemplate] | None = PrivateAttr(default=None)
_parents_by_identifier: Dict[str, set[str]] | None = PrivateAttr(default=None) # type: ignore
Expand Down Expand Up @@ -119,26 +121,28 @@ def dfs(node_identifier: str, parents: set[str], path: set[str]) -> None:
@field_validator('name')
@classmethod
def validate_name(cls, v: str) -> str:
if v == "" or v is None:
trimmed_v = v.strip()
if trimmed_v == "" or trimmed_v is None:
Comment thread
NiveditJain marked this conversation as resolved.
raise ValueError("Name cannot be empty")
return v
return trimmed_v

@field_validator('namespace')
@classmethod
def validate_namespace(cls, v: str) -> str:
if v == "" or v is None:
trimmed_v = v.strip()
if trimmed_v == "" or trimmed_v is None:
Comment thread
NiveditJain marked this conversation as resolved.
raise ValueError("Namespace cannot be empty")
return v
return trimmed_v
Comment thread
NiveditJain marked this conversation as resolved.

@field_validator('secrets')
@classmethod
def validate_secrets(cls, v: Dict[str, str]) -> Dict[str, str]:
for secret_name, secret_value in v.items():
trimmed_v = {key.strip(): value.strip() for key, value in v.items()}
for secret_name, secret_value in trimmed_v.items():
if not secret_name or not secret_value:
raise ValueError("Secrets cannot be empty")
cls._validate_secret_value(secret_value)

return v
return trimmed_v

Comment thread
NiveditJain marked this conversation as resolved.
Outdated
@field_validator('nodes')
@classmethod
Expand Down Expand Up @@ -236,12 +240,26 @@ def verify_input_dependencies(self) -> Self:
continue

dependent_string = DependentString.create_dependent_string(input_value)
dependent_identifiers = set([identifier for identifier, _ in dependent_string.get_identifier_field()])
dependent_identifiers = set()
store_fields = set()

for key, field in dependent_string.get_identifier_field():
if key == "store":
store_fields.add(field)
else:
dependent_identifiers.add(key)

for identifier in dependent_identifiers:
if identifier not in self.get_parents_by_identifier(node.identifier):
if identifier == "store":
continue

elif identifier not in self.get_parents_by_identifier(node.identifier):
errors.append(f"Input {input_value} depends on {identifier} but {identifier} is not a parent of {node.identifier}")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

for field in store_fields:
if field not in self.store_config.required_keys and field not in self.store_config.default_values:
errors.append(f"Input {input_value} depends on {field} but {field} is not a required key or a default value")

except Exception as e:
errors.append(f"Error creating dependent string for input {input_value} check syntax string: {str(e)}")
if errors:
Expand Down
36 changes: 36 additions & 0 deletions state-manager/app/models/db/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from beanie import Document
from pydantic import Field
from pymongo import IndexModel

class Store(Document):
run_id: str = Field(..., description="Run ID of the corresponding graph execution")
namespace: str = Field(..., description="Namespace of the graph")
graph_name: str = Field(..., description="Name of the graph")
key: str = Field(..., description="Key of the store")
value: str = Field(..., description="Value of the store")

Comment thread
NiveditJain marked this conversation as resolved.
class Settings:
indexes = [
IndexModel(
[
("run_id", 1),
("namespace", 1),
("graph_name", 1),
("key", 1),
],
unique=True,
name="uniq_run_id_namespace_graph_name_key",
)
]
Comment thread
NiveditJain marked this conversation as resolved.

@staticmethod
async def get_value(run_id: str, namespace: str, graph_name: str, key: str) -> str | None:
store = await Store.find_one(
Store.run_id == run_id,
Store.namespace == namespace,
Store.graph_name == graph_name,
Store.key == key,
)
if store is None:
return None
return store.value
Comment thread
NiveditJain marked this conversation as resolved.
9 changes: 6 additions & 3 deletions state-manager/app/models/dependent_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ def create_dependent_string(syntax_string: str) -> "DependentString":
placeholder_content, tail = split.split("}}", 1)

parts = [p.strip() for p in placeholder_content.split(".")]
if len(parts) != 3 or parts[1] != "outputs":

if len(parts) == 3 and parts[1] == "outputs":
dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[2], tail=tail)
elif len(parts) == 2 and parts[0] == "store":
dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[1], tail=tail)
else:
raise ValueError(f"Invalid syntax string placeholder {placeholder_content} for: {syntax_string}")
Comment thread
NiveditJain marked this conversation as resolved.

dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[2], tail=tail)

return dependent_string

def _build_mapping_key_to_dependent(self):
Expand Down
3 changes: 3 additions & 0 deletions state-manager/app/models/graph_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
from datetime import datetime
from .graph_template_validation_status import GraphTemplateValidationStatus
from .retry_policy_model import RetryPolicyModel
from .store_config_model import StoreConfig


class UpsertGraphTemplateRequest(BaseModel):
secrets: Dict[str, str] = Field(..., description="Dictionary of secrets that are used while graph execution")
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")


class UpsertGraphTemplateResponse(BaseModel):
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
secrets: Dict[str, bool] = Field(..., description="Dictionary of secrets that are used while graph execution")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")
created_at: datetime = Field(..., description="Timestamp when the graph template was created")
updated_at: datetime = Field(..., description="Timestamp when the graph template was last updated")
validation_status: GraphTemplateValidationStatus = Field(..., description="Current validation status of the graph template")
Expand Down
Loading
Loading