Skip to content
Merged
3 changes: 2 additions & 1 deletion .github/.codespellignorewords
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ SLA
YML
SDK
S3
Kusto
Kusto
NotIn
Comment thread
NiveditJain marked this conversation as resolved.
52 changes: 52 additions & 0 deletions docs/docs/exosphere/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,58 @@ When a node has a `unites` configuration:
2. **State fingerprinting** ensures only one unites state is created per unique combination
3. **Dependency validation** ensures the unites node depends on the specified identifier

### Unites Strategy (Beta)

The `unites` keyword supports different strategies to control when the uniting node should execute. This feature is currently in **beta**.

#### Available Strategies

- **`ALL_SUCCESS`** (default): The uniting node executes only when all states with the specified identifier have reached `SUCCESS` status. If any state fails or is still processing, the uniting node will wait.

- **`ALL_DONE`**: The uniting node executes when all states with the specified identifier have completed their execution (reached `SUCCESS`, `ERRORED`, or any terminal status). This strategy allows the uniting node to proceed even if some states have failed.

Comment thread
NiveditJain marked this conversation as resolved.
#### Strategy Configuration

You can specify the strategy in your unites configuration:

Comment thread
NiveditJain marked this conversation as resolved.
```json hl_lines="22-26"
Comment thread
NiveditJain marked this conversation as resolved.
Outdated
{
"nodes": [
{
"node_name": "DataSplitterNode",
"identifier": "data_splitter",
"next_nodes": ["processor_1"]
},
{
"node_name": "DataProcessorNode",
"identifier": "processor_1",
"inputs":{
"x":"${{data_splitter.outputs.data_chunk}}"
},
"next_nodes": ["result_merger"]
},
{
"node_name": "ResultMergerNode",
"identifier": "result_merger",
"inputs":{
"x_processed":"${{processor_1.outputs.processed_data}}"
},
"unites": {
"identifier": "data_splitter",
"strategy": "ALL_SUCCESS"
},
"next_nodes": []
}
]
}
```

#### Use Cases

- **`ALL_SUCCESS`**: Use when you need all parallel processes to complete successfully before proceeding. Ideal for data processing workflows where partial failures are not acceptable.

- **`ALL_DONE`**: Use when you want to proceed with partial results or when you have error handling logic in the uniting node. Useful for scenarios where you want to aggregate results from successful processes while handling failures separately.

Comment thread
NiveditJain marked this conversation as resolved.
### Unites Example

```json hl_lines="22-24"
Expand Down
7 changes: 7 additions & 0 deletions state-manager/app/models/node_template_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from pydantic import Field, BaseModel, field_validator
from typing import Any, Optional, List
from .dependent_string import DependentString
from enum import Enum


class UnitesStrategyEnum(str, Enum):
ALL_SUCCESS = "ALL_SUCCESS"
ALL_DONE = "ALL_DONE"


class Unites(BaseModel):
identifier: str = Field(..., description="Identifier of the node")
strategy: UnitesStrategyEnum = Field(default=UnitesStrategyEnum.ALL_SUCCESS, description="Strategy of the unites")


class NodeTemplate(BaseModel):
Expand Down
15 changes: 14 additions & 1 deletion state-manager/app/tasks/create_next_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.models.node_template_model import NodeTemplate
from app.models.db.registered_node import RegisteredNode
from app.models.dependent_string import DependentString
from app.models.node_template_model import UnitesStrategyEnum
from json_schema_to_pydantic import create_model
from pydantic import BaseModel
from typing import Type
Expand All @@ -30,7 +31,7 @@ async def check_unites_satisfied(namespace: str, graph_name: str, node_template:
if not unites_id:
raise ValueError(f"Unit identifier not found in parents: {node_template.unites.identifier}")
else:
if await State.find(
if node_template.unites.strategy == UnitesStrategyEnum.ALL_SUCCESS and await State.find(
State.namespace_name == namespace,
State.graph_name == graph_name,
NE(State.status, StateStatusEnum.SUCCESS),
Expand All @@ -39,6 +40,18 @@ async def check_unites_satisfied(namespace: str, graph_name: str, node_template:
}
).count() > 0:
return False

if node_template.unites.strategy == UnitesStrategyEnum.ALL_DONE and await State.find(
State.namespace_name == namespace,
State.graph_name == graph_name,
In(State.status, [StateStatusEnum.CREATED, StateStatusEnum.QUEUED, StateStatusEnum.EXECUTED]),
{
f"parents.{node_template.unites.identifier}": unites_id
}
).count() > 0:
return False
Comment thread
NiveditJain marked this conversation as resolved.
Outdated

Comment thread
NiveditJain marked this conversation as resolved.

return True


Expand Down
46 changes: 45 additions & 1 deletion state-manager/tests/unit/tasks/test_create_next_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from app.models.dependent_string import Dependent, DependentString
from app.models.state_status_enum import StateStatusEnum
from app.models.node_template_model import NodeTemplate, Unites
from app.models.node_template_model import NodeTemplate, Unites, UnitesStrategyEnum
from pydantic import BaseModel


Expand Down Expand Up @@ -263,6 +263,50 @@ async def test_check_unites_satisfied_no_pending_states(self):

assert result is True

@pytest.mark.asyncio
async def test_check_unites_satisfied_all_done_strategy_pending_states(self):
"""Test when there are pending states for ALL_DONE strategy"""
node_template = NodeTemplate(
node_name="test_node",
identifier="test_id",
namespace="test",
inputs={},
next_nodes=None,
unites=Unites(identifier="parent1", strategy=UnitesStrategyEnum.ALL_DONE)
)
parents = {"parent1": PydanticObjectId()}

with patch('app.tasks.create_next_states.State') as mock_state:
mock_find = AsyncMock()
mock_find.count.return_value = 1 # Found pending states
mock_state.find.return_value = mock_find

result = await check_unites_satisfied("test_namespace", "test_graph", node_template, parents)

assert result is False

@pytest.mark.asyncio
async def test_check_unites_satisfied_all_done_strategy_no_pending_states(self):
"""Test when there are no pending states for ALL_DONE strategy"""
node_template = NodeTemplate(
node_name="test_node",
identifier="test_id",
namespace="test",
inputs={},
next_nodes=None,
unites=Unites(identifier="parent1", strategy=UnitesStrategyEnum.ALL_DONE)
)
parents = {"parent1": PydanticObjectId()}

with patch('app.tasks.create_next_states.State') as mock_state:
mock_find = AsyncMock()
mock_find.count.return_value = 0 # No pending states
mock_state.find.return_value = mock_find

result = await check_unites_satisfied("test_namespace", "test_graph", node_template, parents)

assert result is True

Comment thread
NiveditJain marked this conversation as resolved.

class TestValidateDependencies:
"""Test cases for validate_dependencies function"""
Expand Down