diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index ca8678e2d..9fd6124d3 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -103,7 +103,7 @@ public void testWorkflows() throws Exception { String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload); workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); - workflowClient.raiseEvent(instanceId, "MoveForward", payload); + workflowClient.raiseEvent(instanceId, "MoveForward", null); Duration timeout = Duration.ofSeconds(10); WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); @@ -118,8 +118,41 @@ public void testWorkflows() throws Exception { assertEquals(instanceId, workflowOutput.getWorkflowId()); } + @Test + public void testSendEventWorflowToWorkflow() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload); + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + + TestSenderWorkflowPayload senderPayload = new TestSenderWorkflowPayload(instanceId, new ArrayList<>()); + String senderInstanceId = workflowClient.scheduleNewWorkflow(TestSenderWorkflow.class, senderPayload); + workflowClient.waitForInstanceStart(senderInstanceId, Duration.ofSeconds(10), false); + + Duration timeout = Duration.ofSeconds(10); + + WorkflowInstanceStatus senderWorkflowStatus = workflowClient.waitForInstanceCompletion(senderInstanceId, timeout, true); + assertNotNull(senderWorkflowStatus); + + TestSenderWorkflowPayload senderWorkflowPayload = deserialize(senderWorkflowStatus.getSerializedOutput(), TestSenderWorkflowPayload.class); + assertEquals(1, senderWorkflowPayload.getPayloads().size()); + assertEquals("MoveForward event sent", senderWorkflowPayload.getPayloads().get(0)); + + + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + assertEquals(2, workflowOutput.getPayloads().size()); + assertEquals("First Activity", workflowOutput.getPayloads().get(0)); + assertEquals("Second Activity", workflowOutput.getPayloads().get(1)); + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { - return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); + return deserialize(value, TestWorkflowPayload.class); + } + + private T deserialize(String value, Class clazz) throws JsonProcessingException { + return OBJECT_MAPPER.readValue(value, clazz); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java index 0a2487b70..cfe120a24 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java @@ -56,6 +56,7 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder( WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); + builder.registerWorkflow(TestSenderWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflow.java new file mode 100644 index 000000000..da6dc3c58 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflow.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import org.slf4j.Logger; + +public class TestSenderWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting Sender Workflow: " + ctx.getName()); + logger.info("Instance ID: " + instanceId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + TestSenderWorkflowPayload workflowPayload = ctx.getInput(TestSenderWorkflowPayload.class); + workflowPayload.setWorkflowId(instanceId); + + ctx.sendEvent(workflowPayload.getSendToworkflowId(), "MoveForward"); + + workflowPayload.getPayloads().add("MoveForward event sent"); + + ctx.complete(workflowPayload); + }; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflowPayload.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflowPayload.java new file mode 100644 index 000000000..ad11d2da8 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestSenderWorkflowPayload.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import java.util.List; + +public class TestSenderWorkflowPayload extends TestWorkflowPayload { + private String sendToworkflowId; + + public TestSenderWorkflowPayload() { + super(); + } + + public TestSenderWorkflowPayload(List payloads, String workflowId) { + super(payloads, workflowId); + } + + public TestSenderWorkflowPayload(String sendToworkflowId, List payloads) { + super(payloads); + this.sendToworkflowId = sendToworkflowId; + } + + public String getSendToworkflowId() { + return sendToworkflowId; + } + + public void setSendToworkflowId(String sendToworkflowId) { + this.sendToworkflowId = sendToworkflowId; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestWorkflow.java index e3b03543e..351a697b7 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestWorkflow.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestWorkflow.java @@ -36,7 +36,7 @@ public WorkflowStub create() { TestWorkflowPayload payloadAfterFirst = ctx.callActivity(FirstActivity.class.getName(), workflowPayload, TestWorkflowPayload.class).await(); - ctx.waitForExternalEvent("MoveForward", Duration.ofSeconds(3), TestWorkflowPayload.class).await(); + ctx.waitForExternalEvent("MoveForward").await(); TestWorkflowPayload payloadAfterSecond = ctx.callActivity(SecondActivity.class.getName(), payloadAfterFirst, TestWorkflowPayload.class).await(); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 9ed34fdc1..798dda4f7 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -117,7 +117,7 @@ public interface WorkflowContext { * @return a new {@link Task} that completes when the external event is received */ Task waitForExternalEvent(String name) throws TaskCanceledException; - + /** * Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is * received. @@ -138,6 +138,25 @@ default Task waitForExternalEvent(String name, Class dataType) { } } + /** + * Sends an external event to another orchestration instance. + * + * @param instanceID the instance ID of the workflow to send the event to + * @param eventName the name of the event to send + */ + default void sendEvent(String instanceID, String eventName) { + this.sendEvent(instanceID, eventName, null); + } + + /** + * Sends an external event to another orchestration instance. + * + * @param instanceID the instance ID of the workflow to send the event to + * @param eventName the name of the event to send + * @param data the data to send to the workflow + */ + void sendEvent(String instanceID, String eventName, Object data); + /** * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} * that completes when the activity completes. If the activity completes successfully, the returned {@code Task}'s diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index 20572995c..b044238fd 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -244,4 +244,9 @@ private static TaskOptions toTaskOptions(WorkflowTaskOptions options) { return new TaskOptions(retryPolicy); } + + @Override + public void sendEvent(String instanceID, String eventName, Object data) { + this.innerContext.sendEvent(instanceID, eventName, data); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 61d153484..47e5fef3d 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -35,7 +35,6 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -53,6 +52,7 @@ public class DefaultWorkflowContextTest { public void setUp() { mockInnerContext = mock(TaskOrchestrationContext.class); context = new DefaultWorkflowContext(mockInnerContext); + testWorkflowContext = new WorkflowContext() { @Override public Logger getLogger() { @@ -134,6 +134,10 @@ public Task callChildWorkflow(String name, @Nullable Object input, @Nulla @Override public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { } + + @Override + public void sendEvent(String instanceID, String eventName, Object data) { + } }; } @@ -327,4 +331,22 @@ public void newUuidTestNoImplementationExceptionTest() { String expectedMessage = "No implementation found."; assertEquals(expectedMessage, runtimeException.getMessage()); } + + @Test + public void sendEmptyEventTest() { + String expectedInstanceId = "TestInstanceId"; + String expectedEventName = "TestEventName"; + context.sendEvent(expectedInstanceId, expectedEventName); + verify(mockInnerContext, times(1)).sendEvent(expectedInstanceId, expectedEventName, null); + } + + @Test + public void sendEventTest() { + String expectedInstanceId = "TestInstanceId"; + String expectedEventName = "TestEventName"; + Object expectedData = "TestData"; + + context.sendEvent(expectedInstanceId, expectedEventName, expectedData); + verify(mockInnerContext, times(1)).sendEvent(expectedInstanceId, expectedEventName, expectedData); + } }