Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iwf-idl
86 changes: 75 additions & 11 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,7 @@
import io.iworkflow.core.exceptions.WorkflowAlreadyStartedException;
import io.iworkflow.core.exceptions.WorkflowNotExistsException;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.ErrorSubStatus;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.SearchAttributeValueType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetResponse;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.*;
import io.iworkflow.gen.models.WorkflowStateOptions;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
Expand Down Expand Up @@ -505,6 +495,80 @@ public void signalWorkflow(
signalWorkflow(workflowClass, workflowId, "", signalChannelName, signalValue);
}


/**
* Send a single message to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param internalChannelName required
* @param channelMessage optional, can be null.
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannel(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String internalChannelName,
final Object channelMessage) {
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, channelMessage);
}

/**
* Send a single message to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param internalChannelName required
* @param channelMessage optional, can be null.
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannel(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final String internalChannelName,
final Object channelMessage) {
publishToInternalChannelBatch(workflowClass, workflowId, workflowRunId, internalChannelName, channelMessage);
}

/**
* Send a batch of messages to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param internalChannelName required
* @param channelMessages optional, can be null. messages in batch
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannelBatch(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final String internalChannelName,
final Object... channelMessages) {
final String wfType = workflowClass.getSimpleName();

checkWorkflowTypeExists(wfType);

final Class<?> channelValueType = registry.getInternalChannelTypeStore(wfType).getType(internalChannelName);

List<InterStateChannelPublishing> rawMessages = new ArrayList<>(channelMessages.length);
for (Object channelValue : channelMessages) {
if (channelValue != null && !channelValueType.isInstance(channelValue)) {
throw new IllegalArgumentException(String.format("message value is not of channel type %s", channelValueType.getName()));
}
rawMessages.add(
new InterStateChannelPublishing()
.channelName(internalChannelName)
.value(clientOptions.getObjectEncoder().encode(channelValue))
);
}

unregisteredClient.publishToInternalChannel(workflowId, workflowRunId, rawMessages);
}

/**
* @param workflowId required
* @param workflowRunId optional, can be empty
Expand Down
46 changes: 18 additions & 28 deletions src/main/java/io/iworkflow/core/UnregisteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,7 @@
import io.iworkflow.core.validator.CronScheduleValidator;
import io.iworkflow.gen.api.ApiClient;
import io.iworkflow.gen.api.DefaultApi;
import io.iworkflow.gen.models.EncodedObject;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetRequest;
import io.iworkflow.gen.models.WorkflowGetResponse;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
import io.iworkflow.gen.models.WorkflowResetRequest;
import io.iworkflow.gen.models.WorkflowResetResponse;
import io.iworkflow.gen.models.WorkflowRpcRequest;
import io.iworkflow.gen.models.WorkflowRpcResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowSignalRequest;
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
import io.iworkflow.gen.models.WorkflowStartOptions;
import io.iworkflow.gen.models.WorkflowStartRequest;
import io.iworkflow.gen.models.WorkflowStartResponse;
import io.iworkflow.gen.models.WorkflowStatus;
import io.iworkflow.gen.models.WorkflowStopRequest;
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
import io.iworkflow.gen.models.*;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,6 +422,23 @@ public void signalWorkflow(
}
}

public void publishToInternalChannel(
final String workflowId,
final String workflowRunId,
final List<InterStateChannelPublishing> messages){

try {
defaultApi.apiV1WorkflowPublishToInternalChannelPost(
new PublishToInternalChannelRequest()
.messages(messages)
.workflowId(workflowId)
.workflowRunId(workflowRunId)
);
} catch (FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

/**
* @param workflowId workflowId
* @param workflowRunId workflowRunId
Expand Down
13 changes: 13 additions & 0 deletions src/test/java/io/iworkflow/integ/InternalChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
import io.iworkflow.integ.internalchannel.WaitingInternalChannelWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
Expand All @@ -28,4 +29,16 @@ public void testBasicInternalWorkflow() throws InterruptedException {
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(3, output);
}

@Test
public void testWaitingInternalWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "waiting-internal-test-id" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
WaitingInternalChannelWorkflow.class, wfId, 10, input);
client.publishToInternalChannelBatch(WaitingInternalChannelWorkflow.class, wfId, "", WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME, 2, 3);
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(6, output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.communication.CommunicationMethodDef;
import io.iworkflow.core.communication.InternalChannelDef;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

@Component
public class WaitingInternalChannelWorkflow implements ObjectWorkflow {
public static final String INTER_STATE_CHANNEL_NAME = "test-inter-state-channel-1";

@Override
public List<CommunicationMethodDef> getCommunicationSchema() {
return Arrays.asList(
InternalChannelDef.create(Integer.class, INTER_STATE_CHANNEL_NAME)
);
}

@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new WaitingInternalChannelWorkflowState())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.InternalChannelCommand;
import io.iworkflow.core.communication.InternalChannelCommandResult;
import io.iworkflow.core.persistence.Persistence;

public class WaitingInternalChannelWorkflowState implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
return Integer.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Integer input,
Persistence persistence,
final Communication communication) {
return CommandRequest.forAllCommandCompleted(
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME),
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME)
);
}

@Override
public StateDecision execute(
Context context,
Integer input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
final InternalChannelCommandResult result1 = commandResults.getAllInternalChannelCommandResult().get(0);
final InternalChannelCommandResult result2 = commandResults.getAllInternalChannelCommandResult().get(1);

Integer output = input + (Integer) result1.getValue().get() + (Integer) result2.getValue().get();

return StateDecision.gracefulCompleteWorkflow(output);
}
}
Loading