Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: workflows sample with a basic deduplication #9

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/modules/java/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ include::example$transfer-workflow/src/main/java/com/example/transfer/applicatio
<6> This time we return an effect that will stop workflow processing, by using special `end` method.
<7> We collect all steps to form a workflow definition.

IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism.
IMPORTANT: In the following example all `WalletEntity` interactions are not idempotent. It means that if the workflow step retries, it will make the deposit or withdraw again. In a real-world scenario, you should consider making all interactions idempotent with a proper deduplication mechanism. A very basic example of handling retries for workflows can be found in https://github.com/akka/akka-sdk/blob/main/samples/transfer-workflow-compensation/src/main/java/com/example/wallet/domain/Wallet.java[this] sample.

== Retrieving state

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void shouldTimedOutTransferWorkflow() {
}


private String randomId() {
public static String randomId() {
return UUID.randomUUID().toString().substring(0, 8);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.example.wallet.application;

import akka.javasdk.testkit.TestKitSupport;
import com.example.wallet.application.WalletEntity.WalletResult;
import com.example.wallet.domain.WalletCommand;
import org.junit.jupiter.api.Test;

import static com.example.transfer.TransferWorkflowIntegrationTest.randomId;
import static org.assertj.core.api.Assertions.assertThat;

class WalletEntityIntegrationTest extends TestKitSupport {

@Test
public void shouldDeduplicateWithdrawCommand() {
// given
var walletId = randomId();
var withdraw = new WalletCommand.Withdraw(randomId(), 10);
await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::create)
.invokeAsync(100));

// when
withdraw(walletId, withdraw);
withdraw(walletId, withdraw);
withdraw(walletId, withdraw);

// then
Integer balance = await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::get)
.invokeAsync());
assertThat(balance).isEqualTo(100 - 10);
}

@Test
public void shouldDeduplicateDepositCommand() {
// given
var walletId = randomId();
var deposit = new WalletCommand.Deposit(randomId(), 10);
await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::create)
.invokeAsync(100));

// when
deposit(walletId, deposit);
deposit(walletId, deposit);
deposit(walletId, deposit);

// then
Integer balance = await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::get)
.invokeAsync());
assertThat(balance).isEqualTo(100 + 10);
}

private WalletResult deposit(String walletId, WalletCommand.Deposit deposit) {
return await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::deposit)
.invokeAsync(deposit));
}

private WalletResult withdraw(String walletId, WalletCommand.Withdraw withdraw) {
return await(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::withdraw)
.invokeAsync(withdraw));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.example.wallet.domain;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class WalletTest {

@Test
public void shouldLimitCommandIdsSize() {
//given
Wallet wallet = new Wallet("w1", 100, new ArrayList<>());

//when
for (int i = 0; i < 10000; i++) {
List<WalletEvent> events = wallet.handle(new WalletCommand.Deposit(UUID.randomUUID().toString(), 10));
wallet = wallet.applyEvent(events.get(0));
}

//then
assertThat(wallet.commandIds()).hasSize(1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.example.wallet.application.WalletEntity.WalletResult;
import com.example.wallet.application.WalletEntity.WalletResult.Failure;
import com.example.wallet.application.WalletEntity.WalletResult.Success;
import com.example.wallet.domain.WalletCommand.Deposit;
import com.example.wallet.domain.WalletCommand.Withdraw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,20 +28,8 @@
import static java.time.Duration.ofHours;
import static java.time.Duration.ofSeconds;

// tag::class[]
@ComponentId("transfer") // <1>
public class TransferWorkflow extends Workflow<TransferState> { // <2>

public record Withdraw(String from, int amount) {
}

// end::class[]

// tag::definition[]
public record Deposit(String to, int amount) {
}

// end::definition[]
public class TransferWorkflow extends Workflow<TransferState> {

private static final Logger logger = LoggerFactory.getLogger(TransferWorkflow.class);

Expand All @@ -58,14 +48,14 @@ public WorkflowDef<TransferState> definition() {
logger.info("Running: " + cmd);
// cancelling the timer in case it was scheduled
return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ ->
componentClient.forEventSourcedEntity(cmd.from)
componentClient.forEventSourcedEntity(currentState().transfer().from())
.method(WalletEntity::withdraw)
.invokeAsync(cmd.amount));
.invokeAsync(cmd));
})
.andThen(WalletResult.class, result -> {
switch (result) {
case Success __ -> {
Deposit depositInput = new Deposit(currentState().transfer().to(), currentState().transfer().amount());
Deposit depositInput = new Deposit(currentState().depositId(), currentState().transfer().amount());
return effects()
.updateState(currentState().withStatus(WITHDRAW_SUCCEED))
.transitionTo("deposit", depositInput);
Expand All @@ -87,9 +77,9 @@ public WorkflowDef<TransferState> definition() {
// end::compensation[]
logger.info("Running: " + cmd);
// tag::compensation[]
return componentClient.forEventSourcedEntity(cmd.to)
return componentClient.forEventSourcedEntity(currentState().transfer().to())
.method(WalletEntity::deposit)
.invokeAsync(cmd.amount);
.invokeAsync(cmd);
})
.andThen(WalletResult.class, result -> { // <1>
switch (result) {
Expand All @@ -116,9 +106,13 @@ public WorkflowDef<TransferState> definition() {
logger.info("Running withdraw compensation");
// tag::compensation[]
var transfer = currentState().transfer();
// end::compensation[]
// depositId is reused for the compensation, just to have a stable commandId and simplify the example
// tag::compensation[]
String commandId = currentState().depositId();
return componentClient.forEventSourcedEntity(transfer.from())
.method(WalletEntity::deposit)
.invokeAsync(transfer.amount());
.invokeAsync(new Deposit(commandId, transfer.amount()));
})
.andThen(WalletResult.class, result -> {
switch (result) {
Expand Down Expand Up @@ -185,29 +179,30 @@ public WorkflowDef<TransferState> definition() {
.addStep(failoverHandler);
}



public Effect<String> startTransfer(Transfer transfer) {
if (currentState() != null) {
return effects().error("transfer already started");
} else if (transfer.amount() <= 0) {
return effects().error("transfer amount should be greater than zero");
} else if (transfer.amount() > 1000) {
logger.info("Waiting for acceptation: " + transfer);
TransferState waitingForAcceptationState = new TransferState(commandContext().workflowId(), transfer)
.withStatus(WAITING_FOR_ACCEPTATION);
return effects()
.updateState(waitingForAcceptationState)
.transitionTo("wait-for-acceptation")
.thenReply("transfer started, waiting for acceptation");
} else {
logger.info("Running: " + transfer);
TransferState initialState = new TransferState(commandContext().workflowId(), transfer);
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
return effects()
.updateState(initialState)
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer started");
String workflowId = commandContext().workflowId();
if (transfer.amount() > 1000) {
logger.info("Waiting for acceptation: " + transfer);
TransferState waitingForAcceptationState = TransferState.create(workflowId, transfer)
.withStatus(WAITING_FOR_ACCEPTATION);
return effects()
.updateState(waitingForAcceptationState)
.transitionTo("wait-for-acceptation")
.thenReply("transfer started, waiting for acceptation");
} else {
logger.info("Running: " + transfer);
TransferState initialState = TransferState.create(workflowId, transfer);
Withdraw withdrawInput = new Withdraw(initialState.withdrawId(), transfer.amount());
return effects()
.updateState(initialState)
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer started");
}
}
}

Expand All @@ -234,7 +229,7 @@ public Effect<String> accept() {
// end::resuming[]
logger.info("Accepting transfer: " + transfer);
// tag::resuming[]
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
Withdraw withdrawInput = new Withdraw(currentState().withdrawId(), transfer.amount());
return effects()
.transitionTo("withdraw", withdrawInput)
.thenReply("transfer accepted");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.example.transfer.domain;

import static com.example.transfer.domain.TransferState.TransferStatus.*;
import java.util.UUID;

public record TransferState(String transferId, Transfer transfer, TransferStatus status) {
import static com.example.transfer.domain.TransferState.TransferStatus.STARTED;

public record TransferState(String transferId, Transfer transfer, TransferStatus status, String withdrawId,
String depositId) {

public record Transfer(String from, String to, int amount) {
}
Expand All @@ -11,11 +14,14 @@ public enum TransferStatus {
STARTED, WITHDRAW_FAILED, WITHDRAW_SUCCEED, DEPOSIT_FAILED, COMPLETED, COMPENSATION_COMPLETED, WAITING_FOR_ACCEPTATION, TRANSFER_ACCEPTATION_TIMED_OUT, REQUIRES_MANUAL_INTERVENTION
}

public TransferState(String transferId, Transfer transfer) {
this(transferId, transfer, STARTED);
public static TransferState create(String transferId, Transfer transfer) {
// commandIds must be the same for every attempt, that's why we keep them as a part of the state
String withdrawId = UUID.randomUUID().toString();
String depositId = UUID.randomUUID().toString();
return new TransferState(transferId, transfer, STARTED, withdrawId, depositId);
}

public TransferState withStatus(TransferStatus newStatus) {
return new TransferState(transferId, transfer, newStatus);
return new TransferState(transferId, transfer, newStatus, withdrawId, depositId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import com.example.wallet.application.WalletEntity;
import com.example.wallet.application.WalletEntity.WalletResult.Failure;
import com.example.wallet.application.WalletEntity.WalletResult.Success;
import com.example.wallet.domain.WalletCommand.Deposit;
import com.example.wallet.domain.WalletCommand.Withdraw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.CompletionStage;

// Opened up for access from the public internet to make the sample service easy to try out.
Expand Down Expand Up @@ -48,7 +51,7 @@ public CompletionStage<HttpResponse> create(String id, int initialAmount) {
@Post("/{id}/deposit/{amount}")
public CompletionStage<HttpResponse> deposit(String id, int amount) {
return componentClient.forEventSourcedEntity(id)
.method(WalletEntity::deposit).invokeAsync(amount)
.method(WalletEntity::deposit).invokeAsync(new Deposit(UUID.randomUUID().toString(), amount))
.thenApply(walletResult ->
switch (walletResult) {
case Success __ -> HttpResponses.ok();
Expand All @@ -63,7 +66,7 @@ public CompletionStage<HttpResponse> deposit(String id, int amount) {
@Post("/{id}/withdraw/{amount}")
public CompletionStage<HttpResponse> withdraw(String id, int amount) {
return componentClient.forEventSourcedEntity(id)
.method(WalletEntity::withdraw).invokeAsync(amount)
.method(WalletEntity::withdraw).invokeAsync(new Withdraw(UUID.randomUUID().toString(), amount))
.thenApply(walletResult ->
switch (walletResult) {
case Success __ -> HttpResponses.ok();
Expand Down
Loading
Loading