Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/exosphere/create-graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def create_graph_template():
}

try:
# Create or update the graph template
# Create or update the graph template (with optional store, beta)
result = await state_manager.upsert_graph(
graph_name="my-workflow",
graph_nodes=graph_nodes,
Expand All @@ -186,6 +186,9 @@ async def create_graph_template():
"strategy": "EXPONENTIAL",
"backoff_factor": 2000,
"exponent": 2
},
store_config={ # beta
"ttl": 7200 # seconds to keep key/values
}
)
print("Graph template created successfully!")
Expand Down
20 changes: 7 additions & 13 deletions docs/docs/exosphere/trigger-graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@ The recommended way to trigger graphs is using the Exosphere Python SDK, which p
state_manager_uri=EXOSPHERE_STATE_MANAGER_URI,
key=EXOSPHERE_API_KEY
)

# Create trigger state
trigger_state = TriggerState(
identifier="data_loader", # Must match a node identifier in your graph
inputs={
"source": "/path/to/data.csv",
"format": "csv",
"batch_size": "1000"
}
)


try:
# Trigger the graph
result = await state_manager.trigger("my-graph", state=trigger_state)
# Trigger the graph with optional store (beta)
result = await state_manager.trigger(
"my-graph",
inputs={"user_id": "123"},
store={"cursor": "0"} # persisted across nodes (beta)
)
Comment thread
NiveditJain marked this conversation as resolved.
print(f"Graph triggered successfully!")
print(f"Run ID: {result['run_id']}")
return result
Expand Down
102 changes: 17 additions & 85 deletions python-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export EXOSPHERE_API_KEY="your-api-key"
- **Async Support**: Native async/await support for high-performance operations
- **Error Handling**: Built-in retry mechanisms and error recovery
- **Scalability**: Designed for high-volume batch processing and workflows
- **Graph Store (beta)**: Persist key-value pairs across nodes within a single run

Comment thread
NiveditJain marked this conversation as resolved.
Outdated
## Architecture

Expand Down Expand Up @@ -241,95 +242,26 @@ trigger_state = TriggerState(
}
)

# Trigger a single state
result = await state_manager.trigger("my-graph", state=trigger_state)

# Or trigger multiple states
trigger_states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]

result = await state_manager.trigger("my-graph", states=trigger_states)
```

**Parameters:**
- `graph_name` (str): The name of the graph to trigger
- `state` (TriggerState, optional): A single trigger state
- `states` (list[TriggerState], optional): A list of trigger states

**Returns:**
- `dict`: The JSON response from the state manager API

**Raises:**
- `ValueError`: If neither `state` nor `states` is provided, if both are provided, or if `states` is an empty list
- `Exception`: If the API request fails with a non-200 status code

### TriggerState Class

The `TriggerState` class represents a trigger state for graph execution. It contains an identifier and a set of input parameters that will be passed to the graph when it is triggered.

#### Creating Trigger States

```python
from exospherehost import TriggerState

# Basic trigger state
trigger_state = TriggerState(
identifier="data-processing",
# Trigger the graph (beta store support)
result = await state_manager.trigger(
"my-graph",
inputs={
"file_path": "/path/to/data.csv",
"batch_size": "1000",
"priority": "high"
}
)

# Trigger state with complex data (serialized as JSON)
import json

complex_data = {
"filters": ["active", "verified"],
"date_range": {"start": "2024-01-01", "end": "2024-01-31"},
"options": {"include_metadata": True, "format": "json"}
}

trigger_state = TriggerState(
identifier="complex-processing",
inputs={
"config": json.dumps(complex_data),
"user_id": "12345"
"user_id": "12345",
"session_token": "abc123def456"
},
store={
"cursor": "0" # persisted across nodes (beta)
}
)
Comment thread
NiveditJain marked this conversation as resolved.
```

**Attributes:**
- `identifier` (str): A unique identifier for this trigger state. Used to distinguish between different trigger states and may be used by the graph to determine how to process the trigger
- `inputs` (dict[str, str]): A dictionary of input parameters that will be passed to the graph. The keys are parameter names and values are parameter values, both as strings

## Integration with ExosphereHost Platform

The Python SDK integrates seamlessly with the ExosphereHost platform, providing:

- **Performance**: Optimized execution with intelligent resource allocation and parallel processing
- **Reliability**: Built-in fault tolerance, automatic recovery, and failover capabilities
- **Scalability**: Automatic scaling based on workload demands
- **Monitoring**: Integrated logging and monitoring capabilities

## Documentation

For more detailed information, visit our [documentation](https://docs.exosphere.host).

## Contributing

We welcome contributions! Please see our [contributing guidelines](https://github.com/exospherehost/exospherehost/blob/main/CONTRIBUTING.md) for details.

## Support

For support and questions:
- **Email**: [nivedit@exosphere.host](mailto:nivedit@exosphere.host)
- **Documentation**: [https://docs.exosphere.host](https://docs.exosphere.host)
- **GitHub Issues**: [https://github.com/exospherehost/exospherehost/issues](https://github.com/exospherehost/exospherehost/issues)
**Parameters:**
- `graph_name` (str): Name of the graph to execute
- `inputs` (dict[str, str] | None): Key/value inputs for the first node (strings only)
- `store` (dict[str, str] | None): Graph-level key/value store (beta) persisted across nodes

## License
**Returns:**
- `dict`: JSON payload from the state manager

This Python SDK is licensed under the MIT License. The main ExosphereHost project is licensed under the Elastic License 2.0.
**Raises:**
- `Exception`: If the HTTP request fails
Comment thread
NiveditJain marked this conversation as resolved.
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.2b2"
version = "0.0.2b3"
114 changes: 51 additions & 63 deletions python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,60 +67,49 @@ def _get_upsert_graph_endpoint(self, graph_name: str):
def _get_get_graph_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}"

async def trigger(self, graph_name: str, state: TriggerState | None = None, states: list[TriggerState] | None = None):
async def trigger(self, graph_name: str, inputs: dict[str, str] | None = None, store: dict[str, str] | None = None):
"""
Trigger a graph execution with one or more trigger states.
Trigger execution of a graph.

This method sends trigger states to the specified graph endpoint to initiate
graph execution. It accepts either a single trigger state or a list of trigger
states, but not both simultaneously.
Beta: This method now supports an optional **store** parameter that lets you
pass a key-value map that is persisted for the lifetime of the graph run. All
keys **and** values must be strings in the current beta release – the schema
may change in future versions.

Args:
graph_name (str): The name of the graph to trigger execution for.
state (TriggerState | None, optional): A single trigger state to send.
Must be provided if `states` is None.
states (list[TriggerState] | None, optional): A list of trigger states to send.
Must be provided if `state` is None. Cannot be an empty list.
graph_name (str): Name of the graph you want to run.
inputs (dict[str, str] | None): Optional inputs for the first node in the
graph. Strings only.
store (dict[str, str] | None): Optional key-value store that will be merged
into the graph-level store before execution (beta).

Returns:
dict: The JSON response from the state manager API containing the
result of the trigger operation.
dict: JSON payload returned by the state-manager API.

Raises:
ValueError: If neither `state` nor `states` is provided, if both are provided,
or if `states` is an empty list.
Exception: If the API request fails with a non-200 status code. The exception
message includes the HTTP status code and response text for debugging.
Exception: If the request fails.

Example:
```python
# Trigger with a single state
state = TriggerState(identifier="my-trigger", inputs={"key": "value"})
result = await state_manager.trigger("my-graph", state=state)

# Trigger with multiple states
states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]
result = await state_manager.trigger("my-graph", states=states)
# Trigger with inputs only
await state_manager.trigger("my-graph", inputs={"user_id": "123"})

# Trigger with inputs **and** a beta store
await state_manager.trigger(
"my-graph",
inputs={"user_id": "123"},
store={"cursor": "0"} # beta
)
```
"""
if state is None and states is None:
raise ValueError("Either state or states must be provided")
if state is not None and states is not None:
raise ValueError("Only one of state or states must be provided")
if states is not None and len(states) == 0:
raise ValueError("States must be a non-empty list")
if inputs is None:
inputs = {}
if store is None:
store = {}

states_list = []
if state is not None:
states_list.append(state)
if states is not None:
states_list.extend(states)

body = {
"states": [state.model_dump() for state in states_list]
"inputs": inputs,
"store": store
}
Comment thread
NiveditJain marked this conversation as resolved.
headers = {
"x-api-key": self._key
Expand Down Expand Up @@ -167,43 +156,42 @@ async def get_graph(self, graph_name: str):
raise Exception(f"Failed to get graph: {response.status} {await response.text()}")
return await response.json()

async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]], secrets: dict[str, str], validation_timeout: int = 60, polling_interval: int = 1):
async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]], secrets: dict[str, str], retry_policy: dict[str, Any] | None = None, store_config: dict[str, Any] | None = None, validation_timeout: int = 60, polling_interval: int = 1):
"""
Create or update a graph in the state manager with validation.
Create or update a graph definition.

Beta: `store_config` is a new field that allows you to configure a
namespaced key-value store that lives for the duration of a graph run. The
feature is in beta and the shape of `store_config` may change.

This method sends a graph definition to the state manager API for creation
or update. After submission, it polls the API to wait for graph validation
to complete, ensuring the graph is properly configured before returning.
After submitting the graph, this helper polls the state-manager until the
graph has been validated (or the timeout is hit).

Args:
graph_name (str): The name of the graph to create or update.
graph_nodes (list[dict[str, Any]]): A list of node definitions that make up
the graph. Each node should contain the necessary configuration for
the graph execution engine.
secrets (dict[str, str]): A dictionary of secret values that will be
available to the graph during execution. Keys are secret names and
values are the secret values.
validation_timeout (int, optional): Maximum time in seconds to wait for
graph validation to complete. Defaults to 60.
polling_interval (int, optional): Time in seconds between validation
status checks. Defaults to 1.

graph_name (str): Graph identifier.
graph_nodes (list[dict[str, Any]]): Graph node list.
secrets (dict[str, str]): Secrets available to all nodes.
retry_policy (dict[str, Any] | None): Optional per-node retry policy.
store_config (dict[str, Any] | None): Beta configuration for the
graph-level store (schema is subject to change).
validation_timeout (int): Seconds to wait for validation (default 60).
polling_interval (int): Polling interval in seconds (default 1).

Returns:
dict: The JSON response from the state manager API containing the
validated graph information.

dict: Validated graph object returned by the API.

Raises:
Exception: If the API request fails with a non-201 status code, if graph
validation times out, or if validation fails. The exception message
includes relevant error details for debugging.
Exception: If validation fails or times out.
"""
Comment thread
NiveditJain marked this conversation as resolved.
endpoint = self._get_upsert_graph_endpoint(graph_name)
headers = {
"x-api-key": self._key
}
body = {
"secrets": secrets,
"nodes": graph_nodes
"nodes": graph_nodes,
"retry_policy": retry_policy,
"store_config": store_config
}
async with aiohttp.ClientSession() as session:
async with session.put(endpoint, json=body, headers=headers) as response: # type: ignore
Expand Down
Loading