Skip to content

Commit 2ac9725

Browse files
authored
Add basic workflow (#8)
1 parent a35a7a4 commit 2ac9725

File tree

6 files changed

+674
-251
lines changed

6 files changed

+674
-251
lines changed

basic/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### How to run
2+
3+
* start a iWF server following the [instructions](https://github.com/indeedeng/iwf#how-to-use)
4+
* build and run `main.py`
5+
* start a workflow: `http://localhost:8802/basic/start?workflowId=test-1108&inputNum=4`
6+
* watch in WebUI `http://localhost:8233/namespaces/default/workflows`

basic/basic_workflow.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from iwf.command_request import CommandRequest, InternalChannelCommand, TimerCommand
2+
from iwf.command_results import CommandResults
3+
from iwf.communication import Communication
4+
from iwf.communication_schema import CommunicationSchema, CommunicationMethod
5+
from iwf.iwf_api.models import ChannelRequestStatus
6+
from iwf.persistence import Persistence
7+
from iwf.persistence_schema import PersistenceSchema, PersistenceField
8+
from iwf.rpc import rpc
9+
from iwf.state_decision import StateDecision
10+
from iwf.state_schema import StateSchema
11+
from iwf.workflow import ObjectWorkflow
12+
from iwf.workflow_context import WorkflowContext
13+
from iwf.workflow_state import WorkflowState
14+
15+
TEST_APPROVAL_KEY = "Approval"
16+
TEST_STRING_KEY = "TestString"
17+
18+
class BasicWorkflow(ObjectWorkflow):
19+
def get_workflow_states(self) -> StateSchema:
20+
return StateSchema.with_starting_state(
21+
BasicWorkflowState1(),
22+
BasicWorkflowState2())
23+
24+
25+
def get_persistence_schema(self) -> PersistenceSchema:
26+
return PersistenceSchema.create(
27+
PersistenceField.data_attribute_def(TEST_STRING_KEY, str),
28+
)
29+
30+
def get_communication_schema(self) -> CommunicationSchema:
31+
return CommunicationSchema.create(
32+
CommunicationMethod.internal_channel_def(TEST_APPROVAL_KEY, str)
33+
)
34+
35+
@rpc()
36+
def append_string(self, st: str, persistence: Persistence) -> str:
37+
current = persistence.get_data_attribute(TEST_STRING_KEY)
38+
if current is None:
39+
current = ""
40+
current = current + ", " + st
41+
persistence.set_data_attribute(TEST_STRING_KEY, current)
42+
return current
43+
44+
@rpc()
45+
def approve(self, communication: Communication):
46+
communication.publish_to_internal_channel(TEST_APPROVAL_KEY, "approved")
47+
48+
class BasicWorkflowState1(WorkflowState[int]):
49+
def execute(
50+
self,
51+
ctx: WorkflowContext,
52+
data: int,
53+
command_results: CommandResults,
54+
persistence: Persistence,
55+
communication: Communication,
56+
) -> StateDecision:
57+
output = data + 1
58+
return StateDecision.single_next_state(BasicWorkflowState2, output)
59+
60+
class BasicWorkflowState2(WorkflowState[int]):
61+
def wait_until(
62+
self,
63+
ctx: WorkflowContext,
64+
data: int,
65+
persistence: Persistence,
66+
communication: Communication,
67+
) -> CommandRequest:
68+
return CommandRequest.for_any_command_completed(
69+
InternalChannelCommand.by_name(TEST_APPROVAL_KEY),
70+
TimerCommand.by_seconds(data)
71+
)
72+
73+
def execute(
74+
self,
75+
ctx: WorkflowContext,
76+
data: int,
77+
command_results: CommandResults,
78+
persistence: Persistence,
79+
communication: Communication,
80+
) -> StateDecision:
81+
internal_channel_result = command_results.internal_channel_commands[0]
82+
if internal_channel_result.status == ChannelRequestStatus.RECEIVED:
83+
return StateDecision.graceful_complete_workflow(internal_channel_result.value)
84+
else:
85+
return StateDecision.single_next_state(BasicWorkflowState2, data)
86+
87+

basic/iwf_config.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from iwf.client import Client
2+
from iwf.registry import Registry
3+
from iwf.worker_service import (
4+
WorkerService,
5+
)
6+
7+
from basic.basic_workflow import BasicWorkflow
8+
9+
registry = Registry()
10+
worker_service = WorkerService(registry)
11+
client = Client(registry, )
12+
13+
registry.add_workflow(BasicWorkflow())

basic/main.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import traceback
2+
3+
from flask import Flask, request
4+
from iwf.iwf_api.models import (
5+
WorkflowStateExecuteRequest,
6+
WorkflowStateWaitUntilRequest,
7+
WorkflowWorkerRpcRequest,
8+
)
9+
from iwf.worker_service import (
10+
WorkerService,
11+
)
12+
13+
from basic.basic_workflow import BasicWorkflow
14+
from basic.iwf_config import client, worker_service
15+
16+
flask_app = Flask(__name__)
17+
18+
19+
# http://localhost:8802/basic/start?workflowId=test-1108&inputNum=4
20+
@flask_app.route("/basic/start")
21+
def basic_start():
22+
workflow_id = request.args["workflowId"]
23+
input_num = request.args["inputNum"]
24+
25+
run_id = client.start_workflow(BasicWorkflow, workflow_id, 3600, int(input_num))
26+
return run_id
27+
28+
# http://localhost:8802/basic/appendString?workflowId=test-1108&str=test
29+
@flask_app.route("/basic/appendString")
30+
def basic_append_string():
31+
workflow_id = request.args["workflowId"]
32+
st = request.args["str"]
33+
client.invoke_rpc(workflow_id, BasicWorkflow.append_string, st)
34+
return st
35+
36+
# http://localhost:8802/basic/approve?workflowId=test-1108
37+
@flask_app.route("/basic/approve")
38+
def basic_approve():
39+
workflow_id = request.args["workflowId"]
40+
client.invoke_rpc(workflow_id, BasicWorkflow.approve)
41+
return "done"
42+
43+
@flask_app.route("/")
44+
def index():
45+
return "iwf workflow home"
46+
47+
48+
# below are iWF workflow worker APIs to be called by iWF server
49+
@flask_app.route(WorkerService.api_path_workflow_state_wait_until, methods=["POST"])
50+
def handle_wait_until():
51+
req = WorkflowStateWaitUntilRequest.from_dict(request.json)
52+
resp = worker_service.handle_workflow_state_wait_until(req)
53+
return resp.to_dict()
54+
55+
56+
@flask_app.route(WorkerService.api_path_workflow_state_execute, methods=["POST"])
57+
def handle_execute():
58+
req = WorkflowStateExecuteRequest.from_dict(request.json)
59+
resp = worker_service.handle_workflow_state_execute(req)
60+
return resp.to_dict()
61+
62+
63+
@flask_app.route(WorkerService.api_path_workflow_worker_rpc, methods=["POST"])
64+
def handle_rpc():
65+
req = WorkflowWorkerRpcRequest.from_dict(request.json)
66+
resp = worker_service.handle_workflow_worker_rpc(req)
67+
return resp.to_dict()
68+
69+
70+
# this handler is extremely useful for debugging iWF
71+
# the WebUI will be able to show you the error with stacktrace
72+
@flask_app.errorhandler(Exception)
73+
def internal_error(exception):
74+
return traceback.format_exc(), 500
75+
76+
77+
def main():
78+
flask_app.run(host="0.0.0.0", port=8802)
79+
80+
81+
if __name__ == "__main__":
82+
main()

0 commit comments

Comments
 (0)