Skip to content

Commit 9ac90a4

Browse files
Implement InterstateChannelCommand (#56)
1 parent 2629483 commit 9ac90a4

22 files changed

+452
-19
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Run the command `git submodule update --remote --merge` to update IDL to the lat
4545
## 1.1
4646

4747
- [x] Reset workflow API (Cadence only, TODO for Temporal)
48-
- [ ] Command type(s) for inter-state communications (e.g. internal channel)
48+
- [x] Command type(s) for inter-state communications (e.g. internal channel)
4949
- [x] AnyCommandCompleted Decider trigger type
5050
- [ ] More workflow start options: IdReusePolicy, initial earch attributes, cron schedule, retry, etc
5151
- [ ] StateOption: Start/Decide API timeout and retry

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ signing {
141141
}
142142

143143
group = "io.github.cadence-oss"
144-
version = "0.2.0"
144+
version = "1.0.0-rc1"
145145

146146
nexusPublishing {
147147
repositories {

src/main/java/io/github/cadenceoss/iwf/core/Registry.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.github.cadenceoss.iwf.core.attributes.QueryAttributeDef;
44
import io.github.cadenceoss.iwf.core.attributes.SearchAttributeDef;
55
import io.github.cadenceoss.iwf.core.attributes.SearchAttributeType;
6+
import io.github.cadenceoss.iwf.core.command.InterStateChannelDef;
67
import io.github.cadenceoss.iwf.core.command.SignalChannelDef;
78

89
import java.util.HashMap;
@@ -13,6 +14,8 @@ public class Registry {
1314
// (workflow type, stateId)-> StateDef
1415
private final Map<String, StateDef> workflowStateStore = new HashMap<>();
1516
private final Map<String, Map<String, Class<?>>> signalTypeStore = new HashMap<>();
17+
18+
private final Map<String, Map<String, Class<?>>> interstateChannelTypeStore = new HashMap<>();
1619
private final Map<String, Map<String, Class<?>>> queryAttributeTypeStore = new HashMap<>();
1720

1821
private final Map<String, Map<String, SearchAttributeType>> searchAttributeTypeStore = new HashMap<>();
@@ -23,6 +26,7 @@ public void addWorkflow(final Workflow wf) {
2326
registerWorkflow(wf);
2427
registerWorkflowState(wf);
2528
registerWorkflowSignal(wf);
29+
registerWorkflowInterstateChannel(wf);
2630
registerWorkflowQueryAttributes(wf);
2731
registerWorkflowSearchAttributes(wf);
2832
}
@@ -65,7 +69,7 @@ private void registerWorkflowSignal(final Workflow wf) {
6569
return;
6670
}
6771

68-
for (SignalChannelDef signalChannelDef: wf.getSignalChannels()) {
72+
for (SignalChannelDef signalChannelDef : wf.getSignalChannels()) {
6973
Map<String, Class<?>> signalNameToTypeMap =
7074
signalTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
7175
if (signalNameToTypeMap.containsKey(signalChannelDef.getSignalChannelName())) {
@@ -76,14 +80,32 @@ private void registerWorkflowSignal(final Workflow wf) {
7680
}
7781
}
7882

83+
private void registerWorkflowInterstateChannel(final Workflow wf) {
84+
String workflowType = getWorkflowType(wf);
85+
if (wf.getInterStateChannels() == null || wf.getInterStateChannels().isEmpty()) {
86+
interstateChannelTypeStore.put(workflowType, new HashMap<>());
87+
return;
88+
}
89+
90+
for (InterStateChannelDef interstateChannelDef : wf.getInterStateChannels()) {
91+
Map<String, Class<?>> nameToTypeMap =
92+
interstateChannelTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
93+
if (nameToTypeMap.containsKey(interstateChannelDef.getChannelName())) {
94+
throw new WorkflowDefinitionException(
95+
String.format("InterStateChannel name %s already exists", interstateChannelDef.getChannelName()));
96+
}
97+
nameToTypeMap.put(interstateChannelDef.getChannelName(), interstateChannelDef.getValueType());
98+
}
99+
}
100+
79101
private void registerWorkflowQueryAttributes(final Workflow wf) {
80102
String workflowType = getWorkflowType(wf);
81103
if (wf.getQueryAttributes() == null || wf.getQueryAttributes().isEmpty()) {
82104
queryAttributeTypeStore.put(workflowType, new HashMap<>());
83105
return;
84106
}
85107

86-
for (QueryAttributeDef queryAttributeDef: wf.getQueryAttributes()) {
108+
for (QueryAttributeDef queryAttributeDef : wf.getQueryAttributes()) {
87109
Map<String, Class<?>> queryAttributeKeyToTypeMap =
88110
queryAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
89111
if (queryAttributeKeyToTypeMap.containsKey(queryAttributeDef.getQueryAttributeKey())) {
@@ -137,6 +159,10 @@ public Map<String, Class<?>> getSignalChannelNameToSignalTypeMap(final String wo
137159
return signalTypeStore.get(workflowType);
138160
}
139161

162+
public Map<String, Class<?>> getInterStateChannelNameToTypeMap(final String workflowType) {
163+
return interstateChannelTypeStore.get(workflowType);
164+
}
165+
140166
public Map<String, Class<?>> getQueryAttributeKeyToTypeMap(final String workflowType) {
141167
return queryAttributeTypeStore.get(workflowType);
142168
}

src/main/java/io/github/cadenceoss/iwf/core/WorkerService.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import io.github.cadenceoss.iwf.core.attributes.SearchAttributeRWImpl;
55
import io.github.cadenceoss.iwf.core.attributes.StateLocalImpl;
66
import io.github.cadenceoss.iwf.core.command.CommandRequest;
7+
import io.github.cadenceoss.iwf.core.command.InterStateChannelCommand;
8+
import io.github.cadenceoss.iwf.core.command.InterStateChannelImpl;
79
import io.github.cadenceoss.iwf.core.mapper.CommandRequestMapper;
810
import io.github.cadenceoss.iwf.core.mapper.CommandResultsMapper;
911
import io.github.cadenceoss.iwf.core.mapper.StateDecisionMapper;
1012
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
13+
import io.github.cadenceoss.iwf.gen.models.InterStateChannelPublishing;
1114
import io.github.cadenceoss.iwf.gen.models.KeyValue;
1215
import io.github.cadenceoss.iwf.gen.models.SearchAttribute;
1316
import io.github.cadenceoss.iwf.gen.models.WorkflowStateDecideRequest;
@@ -45,14 +48,25 @@ public WorkflowStateStartResponse handleWorkflowStateStart(final WorkflowStateSt
4548
final StateLocalImpl stateLocals = new StateLocalImpl(toMap(null), objectEncoder);
4649
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(
4750
registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType()), req.getSearchAttributes());
51+
final InterStateChannelImpl interStateChannel = new InterStateChannelImpl(
52+
registry.getInterStateChannelNameToTypeMap(req.getWorkflowType()), objectEncoder);
4853

4954
CommandRequest commandRequest = state.getWorkflowState().start(
5055
context,
5156
input,
5257
stateLocals,
5358
searchAttributeRW,
5459
queryAttributesRW,
55-
null);
60+
interStateChannel);
61+
62+
commandRequest.getCommands().forEach(cmd -> {
63+
if (cmd instanceof InterStateChannelCommand) {
64+
final String name = ((InterStateChannelCommand) cmd).getChannelName();
65+
if (interStateChannel.getToPublish().containsKey(name)) {
66+
throw new WorkflowDefinitionException("it's not allowed to publish and wait for the same interstate channel - " + name);
67+
}
68+
}
69+
});
5670

5771
return new WorkflowStateStartResponse()
5872
.commandRequest(CommandRequestMapper.toGenerated(commandRequest))
@@ -61,7 +75,8 @@ public WorkflowStateStartResponse handleWorkflowStateStart(final WorkflowStateSt
6175
.recordEvents(stateLocals.getRecordEvents())
6276
.upsertSearchAttributes(createUpsertSearchAttributes(
6377
searchAttributeRW.getUpsertToServerInt64AttributeMap(),
64-
searchAttributeRW.getUpsertToServerKeywordAttributeMap()));
78+
searchAttributeRW.getUpsertToServerKeywordAttributeMap()))
79+
.publishToInterStateChannel(toInterStateChannelPublishing(interStateChannel.getToPublish()));
6580
}
6681

6782
public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowStateDecideRequest req) {
@@ -81,26 +96,45 @@ public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowState
8196
final StateLocalImpl stateLocals = new StateLocalImpl(toMap(req.getStateLocalAttributes()), objectEncoder);
8297
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(
8398
registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType()), req.getSearchAttributes());
99+
final InterStateChannelImpl interStateChannel = new InterStateChannelImpl(
100+
registry.getInterStateChannelNameToTypeMap(req.getWorkflowType()), objectEncoder);
84101

85102
StateDecision stateDecision = state.getWorkflowState().decide(
86103
context,
87104
input,
88105
CommandResultsMapper.fromGenerated(
89106
req.getCommandResults(),
90107
registry.getSignalChannelNameToSignalTypeMap(req.getWorkflowType()),
108+
registry.getInterStateChannelNameToTypeMap(req.getWorkflowType()),
91109
objectEncoder),
92110
stateLocals,
93111
searchAttributeRW,
94112
queryAttributesRW,
95-
null);
113+
interStateChannel);
114+
96115
return new WorkflowStateDecideResponse()
97116
.stateDecision(StateDecisionMapper.toGenerated(stateDecision))
98117
.upsertQueryAttributes(queryAttributesRW.getUpsertQueryAttributes())
99118
.upsertStateLocalAttributes(stateLocals.getUpsertStateLocalAttributes())
100119
.recordEvents(stateLocals.getRecordEvents())
101120
.upsertSearchAttributes(createUpsertSearchAttributes(
102121
searchAttributeRW.getUpsertToServerInt64AttributeMap(),
103-
searchAttributeRW.getUpsertToServerKeywordAttributeMap()));
122+
searchAttributeRW.getUpsertToServerKeywordAttributeMap()))
123+
.publishToInterStateChannel(toInterStateChannelPublishing(interStateChannel.getToPublish()));
124+
}
125+
126+
private List<InterStateChannelPublishing> toInterStateChannelPublishing(final Map<String, List<EncodedObject>> toPublish) {
127+
List<InterStateChannelPublishing> results = new ArrayList<>();
128+
toPublish.forEach((cname, list) -> {
129+
list.forEach(val -> {
130+
final InterStateChannelPublishing pub = new InterStateChannelPublishing()
131+
.channelName(cname)
132+
.value(val);
133+
results.add(pub);
134+
135+
});
136+
});
137+
return results;
104138
}
105139

106140
private QueryAttributesRWImpl createQueryAttributesRW(String workflowType, List<KeyValue> keyValues) {

src/main/java/io/github/cadenceoss/iwf/core/Workflow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ default List<QueryAttributeDef> getQueryAttributes() {
6161
* InterStateChannel are for synchronization communications between WorkflowStates.
6262
* E.g. WorkflowStateA will continue after receiving a value from WorkflowStateB
6363
*/
64-
default List<InterStateChannelDef> getInterStateChannelChannels() {
64+
default List<InterStateChannelDef> getInterStateChannels() {
6565
return Collections.emptyList();
6666
}
6767

src/main/java/io/github/cadenceoss/iwf/core/command/CommandResults.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public abstract class CommandResults {
1616

1717
public abstract List<TimerCommandResult> getAllTimerCommandResults();
1818

19+
public abstract List<InterStateChannelCommandResult> getAllInterStateChannelCommandResult();
20+
1921
// below are helpers
2022
public <T> T getActivityOutputByIndex(int idx) {
2123
throw new RuntimeException("TODO");
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.github.cadenceoss.iwf.core.command;
2+
3+
import org.immutables.value.Value;
4+
5+
@Value.Immutable
6+
public abstract class InterStateChannelCommand implements BaseCommand {
7+
8+
public abstract String getChannelName();
9+
10+
public static InterStateChannelCommand create(final String commandId, final String channelName) {
11+
return ImmutableInterStateChannelCommand.builder()
12+
.channelName(channelName)
13+
.commandId(commandId)
14+
.build();
15+
}
16+
17+
public static InterStateChannelCommand create(String signalName) {
18+
return ImmutableInterStateChannelCommand.builder()
19+
.channelName(signalName)
20+
.build();
21+
}
22+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.github.cadenceoss.iwf.core.command;
2+
3+
import io.github.cadenceoss.iwf.gen.models.InterStateChannelResult;
4+
import org.immutables.value.Value;
5+
6+
import java.util.Optional;
7+
8+
@Value.Immutable
9+
public abstract class InterStateChannelCommandResult {
10+
11+
public abstract String getCommandId();
12+
13+
public abstract String getChannelName();
14+
15+
public abstract Optional<Object> getValue();
16+
17+
public abstract InterStateChannelResult.RequestStatusEnum getRequestStatusEnum();
18+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.github.cadenceoss.iwf.core.command;
2+
3+
import io.github.cadenceoss.iwf.core.ObjectEncoder;
4+
import io.github.cadenceoss.iwf.core.WorkflowDefinitionException;
5+
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
6+
7+
import java.util.ArrayList;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class InterStateChannelImpl implements InterStateChannel {
13+
14+
final Map<String, Class<?>> nameToTypeMap;
15+
final Map<String, List<EncodedObject>> toPublish = new HashMap<>();
16+
17+
final ObjectEncoder objectEncoder;
18+
19+
public InterStateChannelImpl(
20+
final Map<String, Class<?>> nameToTypeMap,
21+
final ObjectEncoder objectEncoder) {
22+
this.nameToTypeMap = nameToTypeMap;
23+
this.objectEncoder = objectEncoder;
24+
}
25+
26+
@Override
27+
public void publish(final String channelName, final Object value) {
28+
final Class<?> type = nameToTypeMap.get(channelName);
29+
if (!type.isInstance(value)) {
30+
throw new WorkflowDefinitionException(String.format("InterStateChannel value is not of type %s", type.getName()));
31+
}
32+
final List<EncodedObject> publish = toPublish.computeIfAbsent(channelName, s -> new ArrayList<>());
33+
publish.add(objectEncoder.encode(value));
34+
}
35+
36+
public Map<String, List<EncodedObject>> getToPublish() {
37+
return toPublish;
38+
}
39+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.github.cadenceoss.iwf.core.command;
22

3+
import io.github.cadenceoss.iwf.gen.models.TimerResult;
34
import org.immutables.value.Value;
45

56
@Value.Immutable
67
public abstract class TimerCommandResult {
78

8-
public abstract TimerStatus getTimerStatus();
9+
public abstract TimerResult.TimerStatusEnum getTimerStatus();
910

1011
public abstract String getCommandId();
1112
}

0 commit comments

Comments
 (0)