Skip to content

Commit

Permalink
feat: isDeleted method for entity command handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Jan 17, 2025
1 parent 6ece293 commit 1857c55
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class KeyValueEntityTestKit<S, E extends KeyValueEntity<S>> {

private S state;
private boolean deleted;
private final S emptyState;
private final E entity;
private final String entityId;
Expand All @@ -36,6 +37,7 @@ private KeyValueEntityTestKit(String entityId, E entity) {
this.entity = entity;
this.state = entity.emptyState();
this.emptyState = state;
this.deleted = false;
}

/**
Expand Down Expand Up @@ -79,13 +81,19 @@ public S getState() {
return state;
}

/** @return true if the entity is deleted */
public boolean isDeleted() {
return deleted;
}

@SuppressWarnings("unchecked")
private <Reply> KeyValueEntityResult<Reply> interpretEffects(KeyValueEntity.Effect<Reply> effect) {
KeyValueEntityResultImpl<Reply> result = new KeyValueEntityResultImpl<>(effect);
if (result.stateWasUpdated()) {
this.state = (S) result.getUpdatedState();
} else if (result.stateWasDeleted()) {
this.state = emptyState;
this.deleted = true;
}
return result;
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public <R> KeyValueEntityResult<R> call(Function<E, KeyValueEntity.Effect<R>> fu
TestKitKeyValueEntityCommandContext commandContext =
new TestKitKeyValueEntityCommandContext(entityId, metadata);
entity._internalSetCommandContext(Optional.of(commandContext));
entity._internalSetCurrentState(this.state);
entity._internalSetCurrentState(this.state, this.deleted);
return interpretEffects(func.apply(entity));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public abstract class EventSourcedEntityEffectsRunner<S, E> {

private EventSourcedEntity<S, E> entity;
private S _state;
private boolean deleted = false;
private List<E> events = new ArrayList<>();

public EventSourcedEntityEffectsRunner(EventSourcedEntity<S, E> entity) {
Expand All @@ -33,7 +34,7 @@ public EventSourcedEntityEffectsRunner(EventSourcedEntity<S, E> entity, List<E>
this.entity = entity;
this._state = entity.emptyState();

entity._internalSetCurrentState(this._state);
entity._internalSetCurrentState(this._state, false);
// NB: updates _state
playEventsForEntity(initialEvents);
}
Expand All @@ -48,7 +49,12 @@ public S getState() {
return _state;
}

/** @return All events emitted by command handlers of this entity up to now */
/** @return true if the entity is deleted */
public boolean isDeleted() {
return deleted;
}

/** @return All events persisted by command handlers of this entity up to now */
public List<E> getAllEvents() {
return events;
}
Expand All @@ -66,14 +72,15 @@ protected <R> EventSourcedResult<R> interpretEffects(
EventSourcedEntity.Effect<R> effectExecuted;
try {
entity._internalSetCommandContext(Optional.of(commandContext));
entity._internalSetCurrentState(this._state);
entity._internalSetCurrentState(this._state, this.deleted);
effectExecuted = effect.get();
this.events.addAll(EventSourcedResultImpl.eventsOf(effectExecuted));
} finally {
entity._internalSetCommandContext(Optional.empty());
}

playEventsForEntity(EventSourcedResultImpl.eventsOf(effectExecuted));
deleted = EventSourcedResultImpl.checkIfDeleted(effectExecuted);

EventSourcedResult<R> result;
try {
Expand All @@ -91,7 +98,7 @@ private void playEventsForEntity(List<E> events) {
entity._internalSetEventContext(Optional.of(new TestKitEventSourcedEntityEventContext()));
for (E event : events) {
this._state = handleEvent(this._state, event);
entity._internalSetCurrentState(this._state);
entity._internalSetCurrentState(this._state, this.deleted);
}
} finally {
entity._internalSetEventContext(Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ import scala.jdk.CollectionConverters._
* INTERNAL API
*/
private[akka] object EventSourcedResultImpl {

def checkIfDeleted[E](effect: EventSourcedEntity.Effect[_]): Boolean = {
effect match {
case ei: EventSourcedEntityEffectImpl[_, E @unchecked] =>
ei.primaryEffect match {
case ee: EmitEvents[E @unchecked] => ee.deleteEntity
case _ => false
}
}
}

def eventsOf[E](effect: EventSourcedEntity.Effect[_]): JList[E] = {
effect match {
case ei: EventSourcedEntityEffectImpl[_, E @unchecked] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ else if (wouldOverflow(value + value) || (value + value) < 0) {
}
}

public Effect<String> delete() {
return effects().persist(new Increased(commandContext().entityId(), 0)).deleteEntity().thenReply(__ -> "Ok");
}

@Override
public Integer applyEvent(Increased increased) {
if (currentState() == null) return increased.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
Expand All @@ -26,6 +27,7 @@ public void testIncrease() {
assertEquals("Ok", result.getReply());
assertEquals(10, testKit.getState());
assertEquals(1, testKit.getAllEvents().size());
assertFalse(testKit.isDeleted());
}

@Test
Expand All @@ -52,6 +54,16 @@ public void testDoubleIncrease() {
assertEquals(2, testKit.getAllEvents().size());
}

@Test
public void testDelete() {
EventSourcedTestKit<Integer, Increased, CounterEventSourcedEntity> testKit =
EventSourcedTestKit.of(ctx -> new CounterEventSourcedEntity());
EventSourcedResult<String> result = testKit.call(entity -> entity.delete());
assertTrue(result.isReply());
assertEquals("Ok", result.getReply());
assertTrue(testKit.isDeleted());
}

@Test
public void testIncreaseWithNegativeValue() {
EventSourcedTestKit<Integer, Increased, CounterEventSourcedEntity> testKit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CounterKeyValueEntityTest {
Expand Down Expand Up @@ -40,6 +41,7 @@ public void testIncreaseWithNegativeValue() {
KeyValueEntityTestKit.of(ctx -> new CounterValueEntity());
KeyValueEntityResult<String> result = testKit.call(entity -> entity.increaseBy(-10));
assertTrue(result.isError());
assertFalse(testKit.isDeleted());
assertEquals(result.getError(), "Can't increase with a negative value");
}

Expand All @@ -52,5 +54,7 @@ public void testDeleteValueEntity() {
assertTrue(result.isReply());
assertEquals(result.getReply(), "Deleted");
assertEquals(testKit.getState(), 0);
assertTrue(testKit.isDeleted());
assertTrue(result.stateWasDeleted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ protected TestKit.Settings testKitSettings() {
}

@Test
public void verifyCounterEventSourcedWiring() throws InterruptedException {

Thread.sleep(10000);

public void verifyCounterEventSourcedWiring() {
var counterId = "hello";
var client = componentClient.forEventSourcedEntity(counterId);

Expand All @@ -58,6 +55,20 @@ public void verifyCounterEventSourcedWiring() throws InterruptedException {
Assertions.assertEquals(200, counterGet);
}

@Test
public void verifyCounterEventSourcedDeletion() {
var counterId = "deleted-hello";
var client = componentClient.forEventSourcedEntity(counterId);

var isDeleted = await(client.method(CounterEntity::getDeleted).invokeAsync());
assertThat(isDeleted).isFalse();

await(client.method(CounterEntity::delete).invokeAsync());

var isDeleted2 = await(client.method(CounterEntity::getDeleted).invokeAsync());
assertThat(isDeleted2).isFalse();
}

@Test
public void verifyCounterErrorEffect() {
var counterId = "hello-error";
Expand Down Expand Up @@ -230,5 +241,4 @@ private void restartCounterEntity(EventSourcedEntityClient client) {
private Integer getCounter(EventSourcedEntityClient client) {
return await(client.method(CounterEntity::get).invokeAsync());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ public void verifyUserSubscriptionAction() {

deleteUser(user);

var isDeleted = await(componentClient
.forKeyValueEntity(user.id())
.method(UserEntity::getDelete)
.invokeAsync());

assertThat(isDeleted).isEqualTo(true);

Awaitility.await()
.ignoreExceptions()
.atMost(15, TimeUnit.of(SECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akkajavasdk.components.eventsourcedentities.counter;

import akka.Done;
import akka.javasdk.annotations.Acl;
import akka.javasdk.annotations.ComponentId;
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
Expand All @@ -13,6 +14,7 @@

import java.util.List;

import static akka.Done.done;
import static java.util.function.Function.identity;

@ComponentId("counter-entity")
Expand Down Expand Up @@ -96,6 +98,11 @@ public ReadOnlyEffect<Integer> get() {
return effects().reply(currentState().value());
}

public ReadOnlyEffect<Boolean> getDeleted() {
// don't modify, we want to make sure we call currentState().value here
return effects().reply(isDeleted());
}

public Effect<Integer> times(Integer value) {
logger.info(
"Multiplying counter with commandId={} commandName={} seqNr={} current={} by value={}",
Expand All @@ -119,6 +126,10 @@ public Effect<Integer> restart() { // force entity restart, useful for testing
throw new RuntimeException("Forceful restarting entity!");
}

public Effect<Done> delete() {
return effects().persist(new CounterEvent.ValueSet(0)).deleteEntity().thenReply(__ -> done());
}

@Override
public Counter applyEvent(CounterEvent event) {
return currentState().apply(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public Effect<Ok> deleteUser(Delete cmd) {
return effects().deleteEntity().thenReply(Ok.instance);
}

public Effect<Boolean> getDelete() {
return effects().reply(isDeleted());
}

public Effect<Integer> restart(Restart cmd) { // force entity restart, useful for testing
logger.info(
"Restarting counter with commandId={} commandName={} current={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public abstract class EventSourcedEntity<S, E> {
private Optional<CommandContext> commandContext = Optional.empty();
private Optional<EventContext> eventContext = Optional.empty();
private Optional<S> currentState = Optional.empty();
private boolean deleted = false;
private boolean handlingCommands = false;

/**
Expand Down Expand Up @@ -131,10 +132,11 @@ public void _internalSetEventContext(Optional<EventContext> context) {
* responsible for finally calling _internalClearCurrentState
*/
@InternalApi
public boolean _internalSetCurrentState(S state) {
public boolean _internalSetCurrentState(S state, boolean deleted) {
var wasHandlingCommands = handlingCommands;
handlingCommands = true;
currentState = Optional.ofNullable(state);
this.deleted = deleted;
return !wasHandlingCommands;
}

Expand Down Expand Up @@ -207,6 +209,13 @@ protected final S currentState() {
throw new IllegalStateException("Current state is only available when handling a command.");
}

/**
* Returns true if the entity has been deleted.
*/
protected boolean isDeleted() {
return deleted;
}

protected final Effect.Builder<S, E> effects() {
return new EventSourcedEntityEffectImpl<S, E>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public abstract class KeyValueEntity<S> {

private Optional<S> currentState = Optional.empty();

private boolean deleted = false;

private boolean handlingCommands = false;

/**
Expand Down Expand Up @@ -85,9 +87,10 @@ public void _internalSetCommandContext(Optional<CommandContext> context) {
* @hidden
*/
@InternalApi
public void _internalSetCurrentState(S state) {
public void _internalSetCurrentState(S state, boolean deleted) {
handlingCommands = true;
currentState = Optional.ofNullable(state);
this.deleted = deleted;
}

/**
Expand Down Expand Up @@ -119,6 +122,13 @@ protected final S currentState() {
throw new IllegalStateException("Current state is only available when handling a command.");
}

/**
* Returns true if the entity has been deleted.
*/
protected boolean isDeleted() {
return deleted;
}

protected final Effect.Builder<S> effects() {
return new KeyValueEntityEffectImpl<S>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[

try {
entity._internalSetCommandContext(Optional.of(cmdContext))
entity._internalSetCurrentState(state)
entity._internalSetCurrentState(state, command.isDeleted)
val commandEffect = router
.handleCommand(command.name, cmdPayload)
.asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve?
Expand Down Expand Up @@ -223,7 +223,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
sequenceNumber: Long): SpiEventSourcedEntity.State = {
val eventContext = new EventContextImpl(entityId, sequenceNumber)
entity._internalSetEventContext(Optional.of(eventContext))
val clearState = entity._internalSetCurrentState(state)
val clearState = entity._internalSetCurrentState(state, false)
try {
router.handleEvent(event)
} finally {
Expand Down
Loading

0 comments on commit 1857c55

Please sign in to comment.