Skip to content

[WIP] feat: Add missing sendEvent #1330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testWorkflows() throws Exception {
String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload);

workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
workflowClient.raiseEvent(instanceId, "MoveForward", payload);
workflowClient.raiseEvent(instanceId, "MoveForward", null);

Duration timeout = Duration.ofSeconds(10);
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
Expand All @@ -118,8 +118,41 @@ public void testWorkflows() throws Exception {
assertEquals(instanceId, workflowOutput.getWorkflowId());
}

@Test
public void testSendEventWorflowToWorkflow() throws Exception {
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload);
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);

TestSenderWorkflowPayload senderPayload = new TestSenderWorkflowPayload(instanceId, new ArrayList<>());
String senderInstanceId = workflowClient.scheduleNewWorkflow(TestSenderWorkflow.class, senderPayload);
workflowClient.waitForInstanceStart(senderInstanceId, Duration.ofSeconds(10), false);

Duration timeout = Duration.ofSeconds(10);

WorkflowInstanceStatus senderWorkflowStatus = workflowClient.waitForInstanceCompletion(senderInstanceId, timeout, true);
assertNotNull(senderWorkflowStatus);

TestSenderWorkflowPayload senderWorkflowPayload = deserialize(senderWorkflowStatus.getSerializedOutput(), TestSenderWorkflowPayload.class);
assertEquals(1, senderWorkflowPayload.getPayloads().size());
assertEquals("MoveForward event sent", senderWorkflowPayload.getPayloads().get(0));


WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput());
assertEquals(2, workflowOutput.getPayloads().size());
assertEquals("First Activity", workflowOutput.getPayloads().get(0));
assertEquals("Second Activity", workflowOutput.getPayloads().get(1));
assertEquals(instanceId, workflowOutput.getWorkflowId());
}


private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
return deserialize(value, TestWorkflowPayload.class);
}

private <T> T deserialize(String value, Class<T> clazz) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, clazz);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder(
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));

builder.registerWorkflow(TestWorkflow.class);
builder.registerWorkflow(TestSenderWorkflow.class);
builder.registerActivity(FirstActivity.class);
builder.registerActivity(SecondActivity.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers;

import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import org.slf4j.Logger;

public class TestSenderWorkflow implements Workflow {

@Override
public WorkflowStub create() {
return ctx -> {
Logger logger = ctx.getLogger();
String instanceId = ctx.getInstanceId();
logger.info("Starting Sender Workflow: " + ctx.getName());
logger.info("Instance ID: " + instanceId);
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());

TestSenderWorkflowPayload workflowPayload = ctx.getInput(TestSenderWorkflowPayload.class);
workflowPayload.setWorkflowId(instanceId);

ctx.sendEvent(workflowPayload.getSendToworkflowId(), "MoveForward");

workflowPayload.getPayloads().add("MoveForward event sent");

ctx.complete(workflowPayload);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers;

import java.util.List;

public class TestSenderWorkflowPayload extends TestWorkflowPayload {
private String sendToworkflowId;

public TestSenderWorkflowPayload() {
super();
}

public TestSenderWorkflowPayload(List<String> payloads, String workflowId) {
super(payloads, workflowId);
}

public TestSenderWorkflowPayload(String sendToworkflowId, List<String> payloads) {
super(payloads);
this.sendToworkflowId = sendToworkflowId;
}

public String getSendToworkflowId() {
return sendToworkflowId;
}

public void setSendToworkflowId(String sendToworkflowId) {
this.sendToworkflowId = sendToworkflowId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public WorkflowStub create() {
TestWorkflowPayload payloadAfterFirst =
ctx.callActivity(FirstActivity.class.getName(), workflowPayload, TestWorkflowPayload.class).await();

ctx.waitForExternalEvent("MoveForward", Duration.ofSeconds(3), TestWorkflowPayload.class).await();
ctx.waitForExternalEvent("MoveForward").await();

TestWorkflowPayload payloadAfterSecond =
ctx.callActivity(SecondActivity.class.getName(), payloadAfterFirst, TestWorkflowPayload.class).await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public interface WorkflowContext {
* @return a new {@link Task} that completes when the external event is received
*/
<V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException;

/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received.
Expand All @@ -138,6 +138,25 @@ default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
}
}

/**
* Sends an external event to another orchestration instance.
*
* @param instanceID the instance ID of the workflow to send the event to
* @param eventName the name of the event to send
*/
default void sendEvent(String instanceID, String eventName) {
this.sendEvent(instanceID, eventName, null);
}

/**
* Sends an external event to another orchestration instance.
*
* @param instanceID the instance ID of the workflow to send the event to
* @param eventName the name of the event to send
* @param data the data to send to the workflow
*/
void sendEvent(String instanceID, String eventName, Object data);

/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. If the activity completes successfully, the returned {@code Task}'s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,9 @@ private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {

return new TaskOptions(retryPolicy);
}

@Override
public void sendEvent(String instanceID, String eventName, Object data) {
this.innerContext.sendEvent(instanceID, eventName, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand All @@ -53,6 +52,7 @@ public class DefaultWorkflowContextTest {
public void setUp() {
mockInnerContext = mock(TaskOrchestrationContext.class);
context = new DefaultWorkflowContext(mockInnerContext);

testWorkflowContext = new WorkflowContext() {
@Override
public Logger getLogger() {
Expand Down Expand Up @@ -134,6 +134,10 @@ public <V> Task<V> callChildWorkflow(String name, @Nullable Object input, @Nulla
@Override
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
}

@Override
public void sendEvent(String instanceID, String eventName, Object data) {
}
};
}

Expand Down Expand Up @@ -327,4 +331,22 @@ public void newUuidTestNoImplementationExceptionTest() {
String expectedMessage = "No implementation found.";
assertEquals(expectedMessage, runtimeException.getMessage());
}

@Test
public void sendEmptyEventTest() {
String expectedInstanceId = "TestInstanceId";
String expectedEventName = "TestEventName";
context.sendEvent(expectedInstanceId, expectedEventName);
verify(mockInnerContext, times(1)).sendEvent(expectedInstanceId, expectedEventName, null);
}

@Test
public void sendEventTest() {
String expectedInstanceId = "TestInstanceId";
String expectedEventName = "TestEventName";
Object expectedData = "TestData";

context.sendEvent(expectedInstanceId, expectedEventName, expectedData);
verify(mockInnerContext, times(1)).sendEvent(expectedInstanceId, expectedEventName, expectedData);
}
}