diff --git a/Makefile b/Makefile index 817cb58dd..b33e17530 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,7 @@ attributes: prepare > "${managed_partials}/attributes.adoc" docs/bin/version.sh | xargs -0 printf ":akka-javasdk-version: %s" \ > "${managed_partials}/attributes.adoc" - echo ":akka-cli-version: 3.0.9" >> "${managed_partials}/attributes.adoc" + echo ":akka-cli-version: 3.0.10" >> "${managed_partials}/attributes.adoc" echo ":akka-cli-min-version: 3.0.4" >> "${managed_partials}/attributes.adoc" # see https://adoptium.net/marketplace/ echo ":java-version: 21" \ diff --git a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/KeyValueEntityTestKit.java b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/KeyValueEntityTestKit.java index 1059a841c..f4565dfbf 100644 --- a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/KeyValueEntityTestKit.java +++ b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/KeyValueEntityTestKit.java @@ -27,6 +27,7 @@ public class KeyValueEntityTestKit> { private S state; + private boolean deleted; private final S emptyState; private final E entity; private final String entityId; @@ -36,6 +37,7 @@ private KeyValueEntityTestKit(String entityId, E entity) { this.entity = entity; this.state = entity.emptyState(); this.emptyState = state; + this.deleted = false; } /** @@ -79,6 +81,11 @@ public S getState() { return state; } + /** @return true if the entity is deleted */ + public boolean isDeleted() { + return deleted; + } + @SuppressWarnings("unchecked") private KeyValueEntityResult interpretEffects(KeyValueEntity.Effect effect) { KeyValueEntityResultImpl result = new KeyValueEntityResultImpl<>(effect); @@ -86,6 +93,7 @@ private KeyValueEntityResult interpretEffects(KeyValueEntity.Effe this.state = (S) result.getUpdatedState(); } else if (result.stateWasDeleted()) { this.state = emptyState; + this.deleted = true; } return result; } @@ -117,7 +125,7 @@ public KeyValueEntityResult call(Function> fu TestKitKeyValueEntityCommandContext commandContext = new TestKitKeyValueEntityCommandContext(entityId, metadata); entity._internalSetCommandContext(Optional.of(commandContext)); - entity._internalSetCurrentState(this.state); + entity._internalSetCurrentState(this.state, this.deleted); return interpretEffects(func.apply(entity)); } } diff --git a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/impl/EventSourcedEntityEffectsRunner.java b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/impl/EventSourcedEntityEffectsRunner.java index 5ba9ab5eb..3da2da91d 100644 --- a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/impl/EventSourcedEntityEffectsRunner.java +++ b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/impl/EventSourcedEntityEffectsRunner.java @@ -17,6 +17,7 @@ public abstract class EventSourcedEntityEffectsRunner { private EventSourcedEntity entity; private S _state; + private boolean deleted = false; private List events = new ArrayList<>(); public EventSourcedEntityEffectsRunner(EventSourcedEntity entity) { @@ -33,7 +34,7 @@ public EventSourcedEntityEffectsRunner(EventSourcedEntity entity, List this.entity = entity; this._state = entity.emptyState(); - entity._internalSetCurrentState(this._state); + entity._internalSetCurrentState(this._state, false); // NB: updates _state playEventsForEntity(initialEvents); } @@ -48,7 +49,12 @@ public S getState() { return _state; } - /** @return All events emitted by command handlers of this entity up to now */ + /** @return true if the entity is deleted */ + public boolean isDeleted() { + return deleted; + } + + /** @return All events persisted by command handlers of this entity up to now */ public List getAllEvents() { return events; } @@ -66,7 +72,7 @@ protected EventSourcedResult interpretEffects( EventSourcedEntity.Effect effectExecuted; try { entity._internalSetCommandContext(Optional.of(commandContext)); - entity._internalSetCurrentState(this._state); + entity._internalSetCurrentState(this._state, this.deleted); effectExecuted = effect.get(); this.events.addAll(EventSourcedResultImpl.eventsOf(effectExecuted)); } finally { @@ -74,6 +80,7 @@ protected EventSourcedResult interpretEffects( } playEventsForEntity(EventSourcedResultImpl.eventsOf(effectExecuted)); + deleted = EventSourcedResultImpl.checkIfDeleted(effectExecuted); EventSourcedResult result; try { @@ -91,7 +98,7 @@ private void playEventsForEntity(List events) { entity._internalSetEventContext(Optional.of(new TestKitEventSourcedEntityEventContext())); for (E event : events) { this._state = handleEvent(this._state, event); - entity._internalSetCurrentState(this._state); + entity._internalSetCurrentState(this._state, this.deleted); } } finally { entity._internalSetEventContext(Optional.empty()); diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventSourcedResultImpl.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventSourcedResultImpl.scala index d56372874..12802ed4e 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventSourcedResultImpl.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventSourcedResultImpl.scala @@ -23,6 +23,17 @@ import scala.jdk.CollectionConverters._ * INTERNAL API */ private[akka] object EventSourcedResultImpl { + + def checkIfDeleted[E](effect: EventSourcedEntity.Effect[_]): Boolean = { + effect match { + case ei: EventSourcedEntityEffectImpl[_, E @unchecked] => + ei.primaryEffect match { + case ee: EmitEvents[E @unchecked] => ee.deleteEntity + case _ => false + } + } + } + def eventsOf[E](effect: EventSourcedEntity.Effect[_]): JList[E] = { effect match { case ei: EventSourcedEntityEffectImpl[_, E @unchecked] => diff --git a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntity.java b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntity.java index c949ed667..423cb69d3 100644 --- a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntity.java +++ b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntity.java @@ -28,6 +28,10 @@ else if (wouldOverflow(value + value) || (value + value) < 0) { } } + public Effect delete() { + return effects().persist(new Increased(commandContext().entityId(), 0)).deleteEntity().thenReply(__ -> "Ok"); + } + @Override public Integer applyEvent(Increased increased) { if (currentState() == null) return increased.value(); diff --git a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntityTest.java b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntityTest.java index 1aa9d492d..9aa83b575 100644 --- a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntityTest.java +++ b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/eventsourced/CounterEventSourcedEntityTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -26,6 +27,7 @@ public void testIncrease() { assertEquals("Ok", result.getReply()); assertEquals(10, testKit.getState()); assertEquals(1, testKit.getAllEvents().size()); + assertFalse(testKit.isDeleted()); } @Test @@ -52,6 +54,16 @@ public void testDoubleIncrease() { assertEquals(2, testKit.getAllEvents().size()); } + @Test + public void testDelete() { + EventSourcedTestKit testKit = + EventSourcedTestKit.of(ctx -> new CounterEventSourcedEntity()); + EventSourcedResult result = testKit.call(entity -> entity.delete()); + assertTrue(result.isReply()); + assertEquals("Ok", result.getReply()); + assertTrue(testKit.isDeleted()); + } + @Test public void testIncreaseWithNegativeValue() { EventSourcedTestKit testKit = diff --git a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/keyvalueentity/CounterKeyValueEntityTest.java b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/keyvalueentity/CounterKeyValueEntityTest.java index c3b6f2b65..03d1699fd 100644 --- a/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/keyvalueentity/CounterKeyValueEntityTest.java +++ b/akka-javasdk-testkit/src/test/java/akka/javasdk/testkit/keyvalueentity/CounterKeyValueEntityTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class CounterKeyValueEntityTest { @@ -40,6 +41,7 @@ public void testIncreaseWithNegativeValue() { KeyValueEntityTestKit.of(ctx -> new CounterValueEntity()); KeyValueEntityResult result = testKit.call(entity -> entity.increaseBy(-10)); assertTrue(result.isError()); + assertFalse(testKit.isDeleted()); assertEquals(result.getError(), "Can't increase with a negative value"); } @@ -52,5 +54,7 @@ public void testDeleteValueEntity() { assertTrue(result.isReply()); assertEquals(result.getReply(), "Deleted"); assertEquals(testKit.getState(), 0); + assertTrue(testKit.isDeleted()); + assertTrue(result.stateWasDeleted()); } } diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java index 4ed7888a5..0927d3afa 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java @@ -41,10 +41,7 @@ protected TestKit.Settings testKitSettings() { } @Test - public void verifyCounterEventSourcedWiring() throws InterruptedException { - - Thread.sleep(10000); - + public void verifyCounterEventSourcedWiring() { var counterId = "hello"; var client = componentClient.forEventSourcedEntity(counterId); @@ -58,6 +55,20 @@ public void verifyCounterEventSourcedWiring() throws InterruptedException { Assertions.assertEquals(200, counterGet); } + @Test + public void verifyCounterEventSourcedDeletion() { + var counterId = "deleted-hello"; + var client = componentClient.forEventSourcedEntity(counterId); + + var isDeleted = await(client.method(CounterEntity::getDeleted).invokeAsync()); + assertThat(isDeleted).isFalse(); + + await(client.method(CounterEntity::delete).invokeAsync()); + + var isDeleted2 = await(client.method(CounterEntity::getDeleted).invokeAsync()); + assertThat(isDeleted2).isFalse(); + } + @Test public void verifyCounterErrorEffect() { var counterId = "hello-error"; @@ -230,5 +241,4 @@ private void restartCounterEntity(EventSourcedEntityClient client) { private Integer getCounter(EventSourcedEntityClient client) { return await(client.method(CounterEntity::get).invokeAsync()); } - } diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java index 5758353d7..0c391bc8b 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java @@ -245,6 +245,13 @@ public void verifyUserSubscriptionAction() { deleteUser(user); + var isDeleted = await(componentClient + .forKeyValueEntity(user.id()) + .method(UserEntity::getDelete) + .invokeAsync()); + + assertThat(isDeleted).isEqualTo(true); + Awaitility.await() .ignoreExceptions() .atMost(15, TimeUnit.of(SECONDS)) diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java index 5e5abb0c2..3966b017c 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java @@ -4,6 +4,7 @@ package akkajavasdk.components.eventsourcedentities.counter; +import akka.Done; import akka.javasdk.annotations.Acl; import akka.javasdk.annotations.ComponentId; import akka.javasdk.eventsourcedentity.EventSourcedEntity; @@ -13,6 +14,7 @@ import java.util.List; +import static akka.Done.done; import static java.util.function.Function.identity; @ComponentId("counter-entity") @@ -96,6 +98,11 @@ public ReadOnlyEffect get() { return effects().reply(currentState().value()); } + public ReadOnlyEffect getDeleted() { + // don't modify, we want to make sure we call currentState().value here + return effects().reply(isDeleted()); + } + public Effect times(Integer value) { logger.info( "Multiplying counter with commandId={} commandName={} seqNr={} current={} by value={}", @@ -119,6 +126,10 @@ public Effect restart() { // force entity restart, useful for testing throw new RuntimeException("Forceful restarting entity!"); } + public Effect delete() { + return effects().persist(new CounterEvent.ValueSet(0)).deleteEntity().thenReply(__ -> done()); + } + @Override public Counter applyEvent(CounterEvent event) { return currentState().apply(event); diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/keyvalueentities/user/UserEntity.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/keyvalueentities/user/UserEntity.java index dae1ace75..5d6480408 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/components/keyvalueentities/user/UserEntity.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/components/keyvalueentities/user/UserEntity.java @@ -59,6 +59,10 @@ public Effect deleteUser(Delete cmd) { return effects().deleteEntity().thenReply(Ok.instance); } + public Effect getDelete() { + return effects().reply(isDeleted()); + } + public Effect restart(Restart cmd) { // force entity restart, useful for testing logger.info( "Restarting counter with commandId={} commandName={} current={}", diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java index 3d434c58c..bc51e3d01 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java @@ -69,6 +69,7 @@ public abstract class EventSourcedEntity { private Optional commandContext = Optional.empty(); private Optional eventContext = Optional.empty(); private Optional currentState = Optional.empty(); + private boolean deleted = false; private boolean handlingCommands = false; /** @@ -131,10 +132,11 @@ public void _internalSetEventContext(Optional context) { * responsible for finally calling _internalClearCurrentState */ @InternalApi - public boolean _internalSetCurrentState(S state) { + public boolean _internalSetCurrentState(S state, boolean deleted) { var wasHandlingCommands = handlingCommands; handlingCommands = true; currentState = Optional.ofNullable(state); + this.deleted = deleted; return !wasHandlingCommands; } @@ -207,6 +209,13 @@ protected final S currentState() { throw new IllegalStateException("Current state is only available when handling a command."); } + /** + * Returns true if the entity has been deleted. + */ + protected boolean isDeleted() { + return deleted; + } + protected final Effect.Builder effects() { return new EventSourcedEntityEffectImpl(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/http/RequestBuilder.java b/akka-javasdk/src/main/java/akka/javasdk/http/RequestBuilder.java index 4066a6126..4f967c350 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/http/RequestBuilder.java +++ b/akka-javasdk/src/main/java/akka/javasdk/http/RequestBuilder.java @@ -36,6 +36,8 @@ public interface RequestBuilder { RequestBuilder withTimeout(Duration timeout); + RequestBuilder addQueryParameter(String key, String value); + /** * Transform the request before sending it. This method allows for extra request configuration. */ diff --git a/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/KeyValueEntity.java b/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/KeyValueEntity.java index e678102eb..80e504197 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/KeyValueEntity.java +++ b/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/KeyValueEntity.java @@ -43,6 +43,8 @@ public abstract class KeyValueEntity { private Optional currentState = Optional.empty(); + private boolean deleted = false; + private boolean handlingCommands = false; /** @@ -85,9 +87,10 @@ public void _internalSetCommandContext(Optional context) { * @hidden */ @InternalApi - public void _internalSetCurrentState(S state) { + public void _internalSetCurrentState(S state, boolean deleted) { handlingCommands = true; currentState = Optional.ofNullable(state); + this.deleted = deleted; } /** @@ -119,6 +122,13 @@ protected final S currentState() { throw new IllegalStateException("Current state is only available when handling a command."); } + /** + * Returns true if the entity has been deleted. + */ + protected boolean isDeleted() { + return deleted; + } + protected final Effect.Builder effects() { return new KeyValueEntityEffectImpl(); } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala index b64082da3..d80600cfa 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -126,7 +126,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ try { entity._internalSetCommandContext(Optional.of(cmdContext)) - entity._internalSetCurrentState(state) + entity._internalSetCurrentState(state, command.isDeleted) val commandEffect = router .handleCommand(command.name, cmdPayload) .asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve? @@ -223,7 +223,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ sequenceNumber: Long): SpiEventSourcedEntity.State = { val eventContext = new EventContextImpl(entityId, sequenceNumber) entity._internalSetEventContext(Optional.of(eventContext)) - val clearState = entity._internalSetCurrentState(state) + val clearState = entity._internalSetCurrentState(state, false) try { router.handleEvent(event) } finally { diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientImpl.scala index d1320d57d..11fba5b8c 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientImpl.scala @@ -192,4 +192,10 @@ private[akka] final case class RequestBuilderImpl[R]( timeout, request, (res: HttpResponse, bytes: ByteString) => new StrictResponse[T](res, parse.apply(bytes.toArray))) + + override def addQueryParameter(key: String, value: String): RequestBuilder[R] = { + val query = request.getUri.query().withParam(key, value) + val uriWithQuery = request.getUri.query(query) + withRequest(request.withUri(uriWithQuery)) + } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala index 179a746e2..a43b3f806 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala @@ -49,7 +49,6 @@ private[impl] object KeyValueEntityImpl { override val entityId: String, val sequenceNumber: Long, override val commandName: String, - val isDeleted: Boolean, override val metadata: Metadata, span: Option[Span], tracerFactory: () => Tracer) @@ -109,18 +108,11 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]]( val cmdPayload = command.payload.getOrElse(BytesPayload.empty) val metadata: Metadata = MetadataImpl.of(command.metadata) val cmdContext = - new CommandContextImpl( - entityId, - command.sequenceNumber, - command.name, - command.isDeleted, - metadata, - span, - tracerFactory) + new CommandContextImpl(entityId, command.sequenceNumber, command.name, metadata, span, tracerFactory) try { entity._internalSetCommandContext(Optional.of(cmdContext)) - entity._internalSetCurrentState(state) + entity._internalSetCurrentState(state, command.isDeleted) val commandEffect = router .handleCommand(command.name, cmdPayload) .asInstanceOf[KeyValueEntityEffectImpl[AnyRef]] // FIXME improve? diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala index a246e1729..f3d30c00f 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala @@ -89,12 +89,6 @@ class WorkflowImpl[S, W <: Workflow[S]]( new SpiWorkflow.RecoverStrategy(sdkRecoverStrategy.maxRetries, failoverTo = stepTransition) } - val failoverTo = { - definition.getFailoverStepName.toScala.map { stepName => - new SpiWorkflow.StepTransition(stepName, definition.getFailoverStepInput.toScala.map(serializer.toBytes)) - } - } - val stepConfigs = definition.getStepConfigs.asScala.map { config => val stepTimeout = config.timeout.toScala.map(_.toScala) @@ -102,15 +96,21 @@ class WorkflowImpl[S, W <: Workflow[S]]( (config.stepName, new SpiWorkflow.StepConfig(config.stepName, stepTimeout, failoverRecoverStrategy)) }.toMap - val failoverRecoverStrategy = definition.getStepRecoverStrategy.toScala.map(toRecovery) + val defaultStepRecoverStrategy = definition.getStepRecoverStrategy.toScala.map(toRecovery) + + val failoverRecoverStrategy = definition.getFailoverStepName.toScala.map(stepName => + //when failoverStepName exists, maxRetries must exist + new SpiWorkflow.RecoverStrategy( + definition.getFailoverMaxRetries.toScala.get.maxRetries, + new SpiWorkflow.StepTransition(stepName, definition.getFailoverStepInput.toScala.map(serializer.toBytes)))) + val stepTimeout = definition.getStepTimeout.toScala.map(_.toScala) new SpiWorkflow.WorkflowConfig( workflowTimeout = definition.getWorkflowTimeout.toScala.map(_.toScala), - failoverTo = failoverTo, failoverRecoverStrategy = failoverRecoverStrategy, defaultStepTimeout = stepTimeout, - defaultStepRecoverStrategy = failoverRecoverStrategy, + defaultStepRecoverStrategy = defaultStepRecoverStrategy, stepConfigs = stepConfigs) } diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/AnySupportSpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/AnySupportSpec.scala index c5927420a..e11b8d76a 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/AnySupportSpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/AnySupportSpec.scala @@ -4,8 +4,6 @@ package akka.javasdk.impl -import kalix.protocol.discovery.{ DiscoveryProto, UserFunctionError } -import kalix.protocol.event_sourced_entity.EventSourcedEntityProto import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.{ Any => JavaPbAny } import com.google.protobuf.ByteString @@ -15,29 +13,13 @@ import org.scalatest.wordspec.AnyWordSpec class AnySupportSpec extends AnyWordSpec with Matchers with OptionValues { - private val anySupport = new AnySupport( - Array(EventSourcedEntityProto.javaDescriptor, DiscoveryProto.javaDescriptor), - getClass.getClassLoader, - "com.example") + private val anySupport = new AnySupport(Array.empty, getClass.getClassLoader, "com.example") - private val anySupportScala = new AnySupport( - Array(EventSourcedEntityProto.javaDescriptor, DiscoveryProto.javaDescriptor), - getClass.getClassLoader, - "com.example", - AnySupport.PREFER_SCALA) + private val anySupportScala = + new AnySupport(Array.empty, getClass.getClassLoader, "com.example", AnySupport.PREFER_SCALA) "Any support for Java" should { - "support se/deserializing scala protobufs" in { - val error = UserFunctionError("error") - val any = anySupport.encodeScala(UserFunctionError("error")) - any.typeUrl should ===("com.example/kalix.protocol.UserFunctionError") - - val decoded = anySupport.decodePossiblyPrimitive(any) - decoded.getClass should ===(error.getClass) - decoded should ===(error) - } - def testPrimitive[T](name: String, value: T, defaultValue: T) = { val any = anySupport.encodeScala(value) any.typeUrl should ===(AnySupport.KalixPrimitive + name) diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/http/HttpClientImplSpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/http/HttpClientImplSpec.scala new file mode 100644 index 000000000..baf3b0735 --- /dev/null +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/http/HttpClientImplSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.http + +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.javadsl.model.HttpHeader +import akka.javasdk.http.RequestBuilder +import akka.util.ByteString +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class HttpClientImplSpec extends AnyWordSpec with Matchers { + val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty[Nothing], "httpClient") + + "RequestBuilderImpl" should { + "add query parameter" in new HttpClientImplSuite { + val builder: RequestBuilderImpl[ByteString] = get { builder => + builder + .addQueryParameter("key", "some value") + .addQueryParameter("another", "name") + } + builder.request.getUri.toString shouldBe "http://test.com/test?key=some+value&another=name" + } + } + +} + +trait HttpClientImplSuite { + implicit private val system: ActorSystem[Nothing] = + ActorSystem[Nothing](Behaviors.empty[Nothing], "RequestBuilderImplSpec") + + val baseUrl = "http://test.com" + val path = "/test" + val headers: Seq[HttpHeader] = Seq.empty + + private def client = new HttpClientImpl(system, baseUrl, headers) + + private def get(url: String)( + builder: RequestBuilder[ByteString] => RequestBuilder[ByteString]): RequestBuilderImpl[ByteString] = { + builder(client.GET(url)) + .asInstanceOf[RequestBuilderImpl[ByteString]] + } + def get(builder: RequestBuilder[ByteString] => RequestBuilder[ByteString]): RequestBuilderImpl[ByteString] = + get(path)(builder) +} diff --git a/docs/src/modules/concepts/images/steps-1.svg b/docs/src/modules/concepts/images/steps-1.svg new file mode 100644 index 000000000..f31eff7fc --- /dev/null +++ b/docs/src/modules/concepts/images/steps-1.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/images/steps-2.svg b/docs/src/modules/concepts/images/steps-2.svg new file mode 100644 index 000000000..6681d8d2f --- /dev/null +++ b/docs/src/modules/concepts/images/steps-2.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/images/steps-3.svg b/docs/src/modules/concepts/images/steps-3.svg new file mode 100644 index 000000000..d6438f67c --- /dev/null +++ b/docs/src/modules/concepts/images/steps-3.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/images/steps-4.svg b/docs/src/modules/concepts/images/steps-4.svg new file mode 100644 index 000000000..d82d4ecb1 --- /dev/null +++ b/docs/src/modules/concepts/images/steps-4.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/images/steps-5.svg b/docs/src/modules/concepts/images/steps-5.svg new file mode 100644 index 000000000..97150066f --- /dev/null +++ b/docs/src/modules/concepts/images/steps-5.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/images/steps-6.svg b/docs/src/modules/concepts/images/steps-6.svg new file mode 100644 index 000000000..eb9d6f93f --- /dev/null +++ b/docs/src/modules/concepts/images/steps-6.svg @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/src/modules/concepts/pages/multi-region.adoc b/docs/src/modules/concepts/pages/multi-region.adoc index 2b2091478..418bb1e1f 100644 --- a/docs/src/modules/concepts/pages/multi-region.adoc +++ b/docs/src/modules/concepts/pages/multi-region.adoc @@ -16,40 +16,44 @@ Multi-region operations are ideal for: * Low latency global reads * Low latency global writes -== Examples +== Replicated reads -The two main replication strategies that Akka offers are replicated reads, and replicated writes. - -=== Replicated reads - -With replicated reads, an entity has its "home" in one primary region, while being replicated to multiple other regions. +Akka's replicated reads offers full data replication across regions and even cloud providers, without any changes to the service implementation: an entity has its "home" in one _primary region_, while being replicated to multiple other regions. image:geo-a.svg[Geo data replication, width=600] -In the example above, the entity representing Alice has its primary region in Los Angeles (USA). When a user in the primary region performs a read request, the request is handled locally, and the response sent straight back. +In the image above, the entity representing Alice has its primary region in Los Angeles. When a user in the primary region performs a read request image:steps-1.svg[width=20], the request is handled locally, and the response sent straight back image:steps-2.svg[width=20]. + +When the user in the primary region performs a write request image:steps-1.svg[width=20], that request is also handled locally, and a response sent directly back image:steps-2.svg[width=20]. After that write request completes, that write is replicated to other regions image:steps-3.svg[width=20], such as in London (UK). -When the user in the primary region performs a write request, that request is also handled locally, and a response sent directly back. After that write request completes, that write is replicated to other regions, such as in London (UK). A user in London, when they perform a read, that read operation will happen locally, and a response sent immediately back. +A user in London, when they perform a read image:steps-4.svg[width=20], that read operation will happen locally, and a response sent immediately back image:steps-5.svg[width=20]. A user can also perform write operations on entities in non-primary regions. image:geo-b.svg[Geo data replication, width=600] -In this scenario, a user in London (UK) is performing a write operation on the Alice entity. Since London is not the primary region for the Alice entity, Akka will automatically forward that request to the primary region, in this case, Los Angeles (USA). That request will be handled in the USA, and a response sent directly back to the user. - -Akka has a configurable primary selection mode. The two main modes are static, and dynamic. In the static primary selection mode, the primary region for an entity is selected statically as part of the deployment, so all entities have the same primary region. This is useful for scenarios where you want one primary region, with the ability to failover to another region in the case of a regional outage. - -In the dynamic primary selection mode, each entity can have a different region that is considered its primary region. This is selected based on whichever region the entity was first written in. This is useful for scenarios where you want to have the primary region for you data close to the users who use the data. A user, Alice, in the USA, will have her data in the USA, while a user Bob, in the UK, will have his data, in the UK. +In this scenario, a user in London (UK) is performing a write operation on the Alice entity image:steps-1.svg[width=20]. Since London is not the primary region for the Alice entity, Akka will automatically forward that request to the primary region image:steps-2.svg[width=20], in this case, Los Angeles (USA). That request will be handled in the USA, and a response sent directly back to the user image:steps-3.svg[width=20]. image:geo-c.svg[Geo data replication, width=600] -When Bob makes a request in the UK on his data, that request is handled locally, and replicated to the US, while Alice's requests in the USA with her data are handled locally in the USA, and replicated to the UK. +When Bob makes a request in the UK on his data image:steps-1.svg[width=20], that request is handled locally image:steps-2.svg[width=20], and replicated to the US image:steps-3.svg[width=20], while Alice's requests in the USA with her data are handled locally in the USA, and replicated to the UK. The data however is still available in all regions. If Bob travels to the USA, he can still access his data. image:geo-d.svg[Geo data replication, width=600] -When Bob travels to the USA, read requests that Bob makes on his data are handled locally, while write requests are forwarded to the UK. Meanwhile, write requests made by Alice on her data is all handled locally, with writes being replicated to the UK. +When Bob travels to the USA, read requests that Bob makes on his data are handled locally image:steps-1.svg[width=20], while write requests are forwarded to the UK image:steps-2.svg[width=20]. Meanwhile, write requests made by Alice on her data is all handled locally image:steps-4.svg[width=20] image:steps-5.svg[width=20], with writes being replicated to the UK image:steps-6.svg[width=20]. + +=== Primary selection + +How Akka assigns the primary region to an entity is configurable. The two main modes are **static**, and **dynamic**. + +In the **static primary selection** mode (which is the default), the primary region for an entity is selected statically as part of the deployment, so all entities have the same primary region. This is useful for scenarios where you want one primary region, with the ability to fail over to another region in the case of a regional outage. + +In the **dynamic primary selection** mode, each entity can have a different region that is considered its primary region. This is selected based on whichever region the entity was first written in. This is useful for scenarios where you want to have the primary region for you data close to the users who use the data. A user, Alice, in the USA, will have her data in the USA, while a user Bob, in the UK, will have his data, in the UK. + +The Operating section explains more details about xref:operations:regions/index.adoc#selecting-primary[configuring the primary selection mode]. -=== Replicated writes +== Replicated writes The replicated write replication strategy allows every region to be capable of handling writes for all entities. This is done through the use of CRDTs, which can be modified concurrently in different regions, and their changes safely merged without conflict. diff --git a/docs/src/modules/java/pages/event-sourced-entities.adoc b/docs/src/modules/java/pages/event-sourced-entities.adoc index 06fd93dd6..af03a39d0 100644 --- a/docs/src/modules/java/pages/event-sourced-entities.adoc +++ b/docs/src/modules/java/pages/event-sourced-entities.adoc @@ -141,21 +141,7 @@ include::example$shopping-cart-quickstart/src/main/java/shoppingcart/application IMPORTANT: We are returning the internal state directly back to the requester. In the endpoint, it's usually best to convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code. -== Snapshots - -Snapshots are an important optimization for Event Sourced Entities that persist many events. Rather than reading the entire journal upon loading or restart, Akka can initiate them from a snapshot. - -Snapshots are stored and handled automatically by Akka without any specific code required. Snapshots are stored after a configured number of events: - -[source,conf,indent=0] -.{sample-base-url}/shopping-cart-quickstart/src/main/resources/application.conf[application.conf] ----- -include::example$shopping-cart-quickstart/src/main/resources/application.conf[tag=snapshot-every] ----- - -When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received. - -== Deleting an Entity +=== Deleting an Entity Normally, Event Sourced Entities are not deleted because the history of the events typically provide business value. For certain use cases or for regulatory reasons the entity can be deleted. @@ -170,12 +156,26 @@ include::example$shopping-cart-quickstart/src/main/java/shoppingcart/application When you give the instruction to delete the entity it will still exist for some time, including its events and snapshots. The actual removal of events and snapshots will be deleted later to give downstream consumers time to process all prior events, including the final event that was persisted together with the `deleteEntity` effect. By default, the existence of the entity is completely cleaned up after a week. -It is not allowed to persist more events after the entity has been "marked" as deleted. You can still handle read requests to the entity until it has been completely removed. +It is not allowed to persist more events after the entity has been "marked" as deleted. You can still handle read requests to the entity until it has been completely removed. To check whether the entity has been deleted, you can use the `isDeleted` method inherited from the `EventSourcedEntity` class. It is best to not reuse the same entity id after deletion, but if that happens after the entity has been completely removed it will be instantiated as a completely new entity without any knowledge of previous state. Note that xref:views.adoc#ve_delete[deleting View state] must be handled explicitly. +== Snapshots + +Snapshots are an important optimization for Event Sourced Entities that persist many events. Rather than reading the entire journal upon loading or restart, Akka can initiate them from a snapshot. + +Snapshots are stored and handled automatically by Akka without any specific code required. Snapshots are stored after a configured number of events: + +[source,conf,indent=0] +.{sample-base-url}/shopping-cart-quickstart/src/main/resources/application.conf[application.conf] +---- +include::example$shopping-cart-quickstart/src/main/resources/application.conf[tag=snapshot-every] +---- + +When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received. + [#_replication] include::partial$mutli-region-replication.adoc[] diff --git a/docs/src/modules/java/pages/key-value-entities.adoc b/docs/src/modules/java/pages/key-value-entities.adoc index 3d943e6c9..129b8603d 100644 --- a/docs/src/modules/java/pages/key-value-entities.adoc +++ b/docs/src/modules/java/pages/key-value-entities.adoc @@ -112,7 +112,7 @@ include::example$key-value-counter/src/main/java/com/example/application/Counter When you give the instruction to delete the entity it will still exist with an empty state for some time. The actual removal happens later to give downstream consumers time to process the change. By default, the existence of the entity is completely cleaned up after a week. -It is not allowed to make further changes after the entity has been "marked" as deleted. You can still handle read requests to the entity until it has been completely removed, but the current state will be empty. +It is not allowed to make further changes after the entity has been "marked" as deleted. You can still handle read requests to the entity until it has been completely removed, but the current state will be empty. To check whether the entity has been deleted, you can use the `isDeleted` method inherited from the `KeyValueEntity` class. NOTE: If you don't want to permanently delete an entity, you can instead use the `updateState` effect with an empty state. This will work the same as resetting the entity to its initial state. diff --git a/docs/src/modules/operations/pages/services/deploy-service.adoc b/docs/src/modules/operations/pages/services/deploy-service.adoc index 6f3f880f5..9cba4046d 100644 --- a/docs/src/modules/operations/pages/services/deploy-service.adoc +++ b/docs/src/modules/operations/pages/services/deploy-service.adoc @@ -30,7 +30,7 @@ The docker build output in maven will print something similar to the following: ---- DOCKER> Tagging image shopping-cart:1.0-SNAPSHOT-20241028102843 successful! ---- - +[#_deploying_a_service] == Deploying a service Services can be deployed via the Akka CLI. @@ -171,16 +171,17 @@ For further details, see xref:operations:projects/external-container-registries. [#apply] == Using service descriptors -Akka services can also be described and managed with *YAML service descriptors*. +Akka services can also be described and managed with *YAML service descriptors*. See xref:reference:descriptors/service-descriptor.adoc[]. -First, <<_build_container_image>> and then push the image: +You can deploy your service using a service descriptor. +For this you need at least the image, which you can get by <<_build_container_image, building the container image>> and then <<_pushing_to_acr, pushing it to the container registry>>: [source, command line] ---- akka container-registry push container-name:tag-name ---- -Example of service descriptor: +Once pushed, you need to use the suggested image from the command's output in your service descriptor, for example: [source, yaml] ---- @@ -192,7 +193,7 @@ spec: value: some value ---- -NOTE: You must add the primary region image tag. See `akka container-registry push` command's output. +NOTE: You must add the primary region image tag from `akka container-registry push` output. To apply this descriptor, run: @@ -216,6 +217,15 @@ After editing the service descriptor (e.g., `my-service.yaml`), redeploy it with akka service apply -f my-service.yaml ---- +=== Editing the service descriptor in place + +Once you have <<_deploying_a_service, deployed your service>>, you can also modify it by editing its service descriptor: + +[source, command line] +---- +akka service edit my-service +---- + == Removing a deployed service To delete a service and free its resources, run the following command, replacing `my-service` with the name of the service you want to remove: @@ -245,3 +255,4 @@ You now know how to deploy, verify, update, and remove Akka services using the A == Related documentation - xref:reference:cli/akka-cli/akka_services.adoc[`akka service` CLI commands] +- xref:reference:descriptors/service-descriptor.adoc[Akka Service Descriptor]