Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 0 additions & 93 deletions state-manager/app/controller/create_states.py

This file was deleted.

85 changes: 85 additions & 0 deletions state-manager/app/controller/trigger_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from fastapi import HTTPException

from app.singletons.logs_manager import LogsManager
from app.models.trigger_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from app.models.state_status_enum import StateStatusEnum
from app.models.db.state import State
from app.models.db.store import Store
from app.models.db.graph_template_model import GraphTemplate
from app.models.node_template_model import NodeTemplate
import uuid

logger = LogsManager().get_logger()

def check_required_store_keys(graph_template: GraphTemplate, store: dict[str, str]) -> None:
errors = []
keys = set()

for secret in graph_template.secrets.keys():
if secret not in store.keys():
errors.append(f"Missing store key: {secret}")

Comment thread
NiveditJain marked this conversation as resolved.
Outdated
for key in store.keys():
if key in keys:
errors.append(f"Duplicate store keys: {key}")
keys.add(key)

if errors:
raise HTTPException(status_code=400, detail=f"Errors: {errors}")

Comment thread
NiveditJain marked this conversation as resolved.
Comment thread
NiveditJain marked this conversation as resolved.

def construct_inputs(node: NodeTemplate, inputs: dict[str, str]) -> dict[str, str]:
constructed_inputs = {}
for key, value in node.inputs.items():
if key in inputs.keys():
constructed_inputs[key] = inputs[key]
else:
constructed_inputs[key] = value
return constructed_inputs
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

Comment thread
NiveditJain marked this conversation as resolved.

async def trigger_graph(namespace_name: str, graph_name: str, body: TriggerGraphRequestModel, x_exosphere_request_id: str) -> TriggerGraphResponseModel:
try:
run_id = str(uuid.uuid4())
logger.info(f"Triggering graph {graph_name} with run_id {run_id}", x_exosphere_request_id=x_exosphere_request_id)

graph_template = await GraphTemplate.get(namespace_name, graph_name)

check_required_store_keys(graph_template, body.store)

new_stores = []
for key, value in body.store.items():
new_store = Store(
run_id=run_id,
namespace=namespace_name,
graph_name=graph_name,
key=key,
value=value
)
new_stores.append(new_store)
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

await Store.insert_many(new_stores)
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

root = graph_template.get_root_node()

new_state = State(
node_name=root.node_name,
namespace_name=namespace_name,
identifier=root.identifier,
graph_name=graph_name,
run_id=run_id,
status=StateStatusEnum.CREATED,
inputs=construct_inputs(root, body.inputs),
outputs={},
error=None
)
await new_state.insert()

return TriggerGraphResponseModel(
status=StateStatusEnum.CREATED,
run_id=run_id
)

except Exception as e:
logger.error(f"Error triggering graph {graph_name} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise e
Comment thread
NiveditJain marked this conversation as resolved.
3 changes: 2 additions & 1 deletion state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .models.db.state import State
from .models.db.graph_template_model import GraphTemplate
from .models.db.registered_node import RegisteredNode
from .models.db.store import Store

# injecting routes
from .routes import router
Expand All @@ -41,7 +42,7 @@ async def lifespan(app: FastAPI):
# initializing beanie
client = AsyncMongoClient(settings.mongo_uri)
db = client[settings.mongo_database_name]
await init_beanie(db, document_models=[State, GraphTemplate, RegisteredNode])
await init_beanie(db, document_models=[State, GraphTemplate, RegisteredNode, Store])
logger.info("beanie dbs initialized")

# initialize secret
Expand Down
39 changes: 0 additions & 39 deletions state-manager/app/models/create_models.py

This file was deleted.

12 changes: 11 additions & 1 deletion state-manager/app/models/db/graph_template_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.utils.encrypter import get_encrypter
from app.models.dependent_string import DependentString
from app.models.retry_policy_model import RetryPolicyModel
from app.models.store_config_model import StoreConfig

class GraphTemplate(BaseDatabaseModel):
name: str = Field(..., description="Name of the graph")
Expand All @@ -21,6 +22,7 @@ class GraphTemplate(BaseDatabaseModel):
validation_errors: List[str] = Field(default_factory=list, description="Validation errors of the graph")
secrets: Dict[str, str] = Field(default_factory=dict, description="Secrets of the graph")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")

_node_by_identifier: Dict[str, NodeTemplate] | None = PrivateAttr(default=None)
_parents_by_identifier: Dict[str, set[str]] | None = PrivateAttr(default=None) # type: ignore
Expand Down Expand Up @@ -237,11 +239,19 @@ def verify_input_dependencies(self) -> Self:

dependent_string = DependentString.create_dependent_string(input_value)
dependent_identifiers = set([identifier for identifier, _ in dependent_string.get_identifier_field()])
store_fields = set([field for identifier, field in dependent_string.get_identifier_field() if identifier == "store"])
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

for identifier in dependent_identifiers:
if identifier not in self.get_parents_by_identifier(node.identifier):
if identifier == "store":
continue

elif identifier not in self.get_parents_by_identifier(node.identifier):
errors.append(f"Input {input_value} depends on {identifier} but {identifier} is not a parent of {node.identifier}")
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

for field in store_fields:
if field not in self.store_config.required_keys and field not in self.store_config.default_values:
errors.append(f"Input {input_value} depends on {field} but {field} is not a required key or a default value")

Comment thread
NiveditJain marked this conversation as resolved.
except Exception as e:
errors.append(f"Error creating dependent string for input {input_value} check syntax string: {str(e)}")
if errors:
Expand Down
36 changes: 36 additions & 0 deletions state-manager/app/models/db/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from beanie import Document
from pydantic import Field
from pymongo import IndexModel

class Store(Document):
run_id: str = Field(..., description="Run ID of the corresponding graph execution")
namespace: str = Field(..., description="Namespace of the graph")
graph_name: str = Field(..., description="Name of the graph")
key: str = Field(..., description="Key of the store")
value: str = Field(..., description="Value of the store")

Comment thread
NiveditJain marked this conversation as resolved.
class Settings:
indexes = [
IndexModel(
[
("run_id", 1),
("namespace", 1),
("graph_name", 1),
("key", 1),
],
unique=True,
name="uniq_run_id_namespace_graph_name_key",
)
]
Comment thread
NiveditJain marked this conversation as resolved.

@staticmethod
async def get_value(run_id: str, namespace: str, graph_name: str, key: str) -> str | None:
store = await Store.find_one(
Store.run_id == run_id,
Store.namespace == namespace,
Store.graph_name == graph_name,
Store.key == key,
)
if store is None:
return None
return store.value
Comment thread
NiveditJain marked this conversation as resolved.
9 changes: 6 additions & 3 deletions state-manager/app/models/dependent_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ def create_dependent_string(syntax_string: str) -> "DependentString":
placeholder_content, tail = split.split("}}", 1)

parts = [p.strip() for p in placeholder_content.split(".")]
if len(parts) != 3 or parts[1] != "outputs":

if len(parts) == 3 and parts[1] == "outputs":
dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[2], tail=tail)
elif len(parts) == 2 and parts[0] == "store":
dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[1], tail=tail)
else:
raise ValueError(f"Invalid syntax string placeholder {placeholder_content} for: {syntax_string}")
Comment thread
NiveditJain marked this conversation as resolved.

dependent_string.dependents[order] = Dependent(identifier=parts[0], field=parts[2], tail=tail)

return dependent_string

def _build_mapping_key_to_dependent(self):
Expand Down
3 changes: 3 additions & 0 deletions state-manager/app/models/graph_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
from datetime import datetime
from .graph_template_validation_status import GraphTemplateValidationStatus
from .retry_policy_model import RetryPolicyModel
from .store_config_model import StoreConfig


class UpsertGraphTemplateRequest(BaseModel):
secrets: Dict[str, str] = Field(..., description="Dictionary of secrets that are used while graph execution")
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")


class UpsertGraphTemplateResponse(BaseModel):
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
secrets: Dict[str, bool] = Field(..., description="Dictionary of secrets that are used while graph execution")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")
created_at: datetime = Field(..., description="Timestamp when the graph template was created")
updated_at: datetime = Field(..., description="Timestamp when the graph template was last updated")
validation_status: GraphTemplateValidationStatus = Field(..., description="Current validation status of the graph template")
Expand Down
2 changes: 2 additions & 0 deletions state-manager/app/models/node_template_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def validate_node_name(cls, v: str) -> str:
def validate_identifier(cls, v: str) -> str:
if v == "" or v is None:
raise ValueError("Node identifier cannot be empty")
elif v.strip() == "store":
raise ValueError("Node identifier cannot be reserved word 'store'")
return v
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

@field_validator('next_nodes')
Expand Down
20 changes: 20 additions & 0 deletions state-manager/app/models/store_config_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pydantic import BaseModel, Field, field_validator

class StoreConfig(BaseModel):
required_keys: list[str] = Field(default_factory=list, description="Required keys of the store")
default_values: dict[str, str] = Field(default_factory=dict, description="Default values of the store")

@field_validator("required_keys")
def validate_required_keys(cls, v: list[str]) -> list[str]:
errors = []
keys = set()
for key in v:
if key == "" or key is None:
errors.append("Key cannot be empty")
elif key in keys:
errors.append(f"Key {key} is duplicated")
else:
keys.add(key)
if len(errors) > 0:
raise ValueError("\n".join(errors))
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
return v
10 changes: 10 additions & 0 deletions state-manager/app/models/trigger_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pydantic import BaseModel, Field
from .state_status_enum import StateStatusEnum

class TriggerGraphRequestModel(BaseModel):
store: dict[str, str] = Field(default_factory=dict, description="Store for the runtime")
inputs: dict[str, str] = Field(default_factory=dict, description="Inputs for the graph execution")
Comment thread
NiveditJain marked this conversation as resolved.

class TriggerGraphResponseModel(BaseModel):
status: StateStatusEnum = Field(..., description="Status of the states")
run_id: str = Field(..., description="Unique run ID generated for this graph execution")
Loading
Loading