From a3748725d433bc3b1bf4961183277b30c0d253d7 Mon Sep 17 00:00:00 2001 From: Andrzej Ludwikowski Date: Mon, 2 Dec 2024 10:59:53 +0100 Subject: [PATCH] chore: using spi for TimedAction and ESE (#45) * snapshot runtime version * chore: SDK implementation of SpiEventSourcedEntity * first stab, untested * corresponds to EventSourcedEntitiesImpl, ReflectiveEventSourcedEntityRouter, EventSourcedEntityRouter * descriptor * setState * error code * isDeleted * chore: updating SDK for work with embedded TimedAction * Apply suggestions from code review * ignoring deprecation * runtime versions --------- Co-authored-by: Patrik Nordwall --- .../akka-javasdk-parent/pom.xml | 2 +- .../java/akka/javasdk/testkit/TestKit.java | 4 +- ...tKitEventSourcedEntityCommandContext.scala | 1 + .../akkajavasdk/EventSourcedEntityTest.java | 45 +- .../java/akkajavasdk/SdkIntegrationTest.java | 2 + .../src/test/resources/logback-test.xml | 6 +- .../eventsourcedentity/CommandContext.java | 2 + .../EventSourcedEntity.java | 10 + .../akka/javasdk/impl/DiscoveryImpl.scala | 3 +- .../akka/javasdk/impl/EntityExceptions.scala | 4 - .../scala/akka/javasdk/impl/SdkRunner.scala | 66 ++- .../impl/client/ComponentClientImpl.scala | 2 +- .../impl/client/EntityClientImpl.scala | 10 +- .../EventSourcedEntitiesImpl.scala | 1 + .../EventSourcedEntityImpl.scala | 292 +++++++++++++ .../ReflectiveEventSourcedEntityRouter.scala | 18 +- .../impl/timedaction/TimedActionImpl.scala | 128 ++++++ .../javasdk/client/ComponentClientTest.java | 4 +- .../akka/javasdk/testkit/TestProtocol.scala | 3 - .../ReplicatedEntityMessages.scala | 399 ------------------ .../TestReplicatedEntityProtocol.scala | 67 --- project/Dependencies.scala | 2 +- publishLocally.sh | 5 +- 23 files changed, 557 insertions(+), 519 deletions(-) create mode 100644 akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala create mode 100644 akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala delete mode 100644 akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/ReplicatedEntityMessages.scala delete mode 100644 akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/TestReplicatedEntityProtocol.scala diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index cdbd1eb0a..460f15041 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,7 @@ 21 - 1.2.2 + 1.3.0-2909c94 UTF-8 false diff --git a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java index 17072d564..dc1e4f3df 100644 --- a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java +++ b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java @@ -32,7 +32,7 @@ import akka.stream.SystemMaterializer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import kalix.runtime.KalixRuntimeMain; +import kalix.runtime.AkkaRuntimeMain; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -474,7 +474,7 @@ public SpiSettings getSettings() { applicationConfig = runner.applicationConfig(); Config runtimeConfig = ConfigFactory.empty(); - runtimeActorSystem = KalixRuntimeMain.start(Some.apply(runtimeConfig), Some.apply(runner)); + runtimeActorSystem = AkkaRuntimeMain.start(Some.apply(runtimeConfig), runner); // wait for SDK to get on start callback (or fail starting), we need it to set up the component client var startupContext = runner.started().toCompletableFuture().get(20, TimeUnit.SECONDS); var componentClients = startupContext.componentClients(); diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala index b39b8b208..641209711 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala @@ -15,6 +15,7 @@ final class TestKitEventSourcedEntityCommandContext( override val commandId: Long = 0L, override val commandName: String = "stubCommandName", override val sequenceNumber: Long = 0L, + override val isDeleted: Boolean = false, override val metadata: Metadata = Metadata.EMPTY) extends CommandContext with InternalContext { diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java index eecf2670a..78abdd8d4 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java @@ -24,13 +24,16 @@ import static java.time.temporal.ChronoUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @ExtendWith(Junit5LogCapturing.class) public class EventSourcedEntityTest extends TestKitSupport { @Test - public void verifyCounterEventSourcedWiring() { + public void verifyCounterEventSourcedWiring() throws InterruptedException { + + Thread.sleep(10000); var counterId = "hello"; var client = componentClient.forEventSourcedEntity(counterId); @@ -47,22 +50,30 @@ public void verifyCounterEventSourcedWiring() { @Test public void verifyCounterErrorEffect() { + var counterId = "hello-error"; + var client = componentClient.forEventSourcedEntity(counterId); + assertThrows(IllegalArgumentException.class, () -> + increaseCounterWithError(client, -1) + ); + } + @Test + public void httpVerifyCounterErrorEffect() { CompletableFuture> call = httpClient.POST("/akka/v1.0/entity/counter-entity/c001/increaseWithError") - .withRequestBody(-10) - .responseBodyAs(String.class) - .invokeAsync() - .toCompletableFuture(); + .withRequestBody(-10) + .responseBodyAs(String.class) + .invokeAsync() + .toCompletableFuture(); Awaitility.await() - .ignoreExceptions() - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> { - - assertThat(call).isCompletedExceptionally(); - assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class); - assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0"); - }); + .ignoreExceptions() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + + assertThat(call).isCompletedExceptionally(); + assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class); + assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0"); + }); } @Test @@ -185,6 +196,12 @@ private Integer increaseCounter(EventSourcedEntityClient client, int value) { .invokeAsync(value)); } + private Counter increaseCounterWithError(EventSourcedEntityClient client, int value) { + return await(client + .method(CounterEntity::increaseWithError) + .invokeAsync(value)); + } + private Integer multiplyCounter(EventSourcedEntityClient client, int value) { return await(client @@ -205,4 +222,4 @@ private Integer getCounter(EventSourcedEntityClient client) { return await(client.method(CounterEntity::get).invokeAsync()); } -} \ No newline at end of file +} diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java index d71ebbc66..7f09dc118 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/SdkIntegrationTest.java @@ -38,6 +38,7 @@ import org.hamcrest.core.IsNull; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -521,6 +522,7 @@ public void verifyMultiTableViewForUserCounters() { } @Test + @Disabled //TODO revert once we deal with metadata translation public void verifyActionWithMetadata() { String metadataValue = "action-value"; diff --git a/akka-javasdk-tests/src/test/resources/logback-test.xml b/akka-javasdk-tests/src/test/resources/logback-test.xml index 52609e398..2eaa38a2a 100644 --- a/akka-javasdk-tests/src/test/resources/logback-test.xml +++ b/akka-javasdk-tests/src/test/resources/logback-test.xml @@ -12,15 +12,17 @@ - - + + + + diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java index 325596d29..af9fb6cf5 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java @@ -37,6 +37,8 @@ public interface CommandContext extends MetadataContext { */ String entityId(); + boolean isDeleted(); + /** Access to tracing for custom app specific tracing. */ Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java index caeafbb5b..3ba864c65 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java @@ -134,6 +134,16 @@ public void _internalSetCurrentState(S state) { currentState = Optional.ofNullable(state); } + /** + * INTERNAL API + * @hidden + */ + @InternalApi + public void _internalClearCurrentState() { + handlingCommands = false; + currentState = Optional.empty(); + } + /** * This is the main event handler method. Whenever an event is persisted, this handler will be called. * It should return the new state of the entity. diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala index 1200daf87..3069e7cd2 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/DiscoveryImpl.scala @@ -119,8 +119,7 @@ class DiscoveryImpl( Component( service.componentType, name, - Component.ComponentSettings.Entity( - EntitySettings(service.componentId, None, forwardHeaders, EntitySettings.SpecificSettings.Empty))) + Component.ComponentSettings.Entity(EntitySettings(service.componentId, forwardHeaders))) } }.toSeq diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityExceptions.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityExceptions.scala index b4692e959..f84801854 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityExceptions.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityExceptions.scala @@ -9,7 +9,6 @@ import akka.javasdk.eventsourcedentity.CommandContext import akka.javasdk.keyvalueentity import kalix.protocol.entity.Command import kalix.protocol.event_sourced_entity.EventSourcedInit -import kalix.protocol.replicated_entity.ReplicatedEntityInit import kalix.protocol.value_entity.ValueEntityInit /** @@ -71,9 +70,6 @@ private[javasdk] object EntityExceptions { def apply(init: EventSourcedInit, message: String): EntityException = ProtocolException(init.entityId, message) - def apply(init: ReplicatedEntityInit, message: String): EntityException = - ProtocolException(init.entityId, message) - } def failureMessageForLog(cause: Throwable): String = cause match { diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 172ef9880..ff025888f 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -7,12 +7,15 @@ package akka.javasdk.impl import java.lang.reflect.Constructor import java.lang.reflect.InvocationTargetException import java.util.concurrent.CompletionStage + +import scala.annotation.nowarn import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.jdk.FutureConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal + import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -83,15 +86,18 @@ import io.opentelemetry.context.{ Context => OtelContext } import kalix.protocol.action.Actions import kalix.protocol.discovery.Discovery import kalix.protocol.event_sourced_entity.EventSourcedEntities -import kalix.protocol.replicated_entity.ReplicatedEntities import kalix.protocol.value_entity.ValueEntities import kalix.protocol.view.Views import kalix.protocol.workflow_entity.WorkflowEntities import org.slf4j.LoggerFactory - import scala.jdk.OptionConverters.RichOptional import scala.jdk.CollectionConverters._ +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl +import akka.javasdk.impl.timedaction.TimedActionImpl +import akka.runtime.sdk.spi.EventSourcedEntityDescriptor +import akka.runtime.sdk.spi.TimedActionDescriptor + /** * INTERNAL API */ @@ -108,6 +114,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends def applicationConfig: Config = ApplicationConfig.loadApplicationConf + @nowarn("msg=deprecated") //TODO remove deprecation once we remove the old constructor override def getSettings: SpiSettings = { val applicationConf = applicationConfig val devModeSettings = @@ -342,12 +349,53 @@ private final class Sdk( } // collect all Endpoints and compose them to build a larger router - private val httpEndpoints = componentClasses + private val httpEndpointDescriptors = componentClasses .filter(Reflect.isRestEndpoint) .map { httpEndpointClass => HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass)) } + private val eventSourcedEntityDescriptors = + componentClasses + .filter(hasComponentId) + .collect { + case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) => + val componentId = clz.getAnnotation(classOf[ComponentId]).value + val entitySpi = + new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]( + sdkSettings, + sdkTracerFactory, + componentId, + clz, + messageCodec, + context => + wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) { + // remember to update component type API doc and docs if changing the set of injectables + case p if p == classOf[EventSourcedEntityContext] => context + }, + sdkSettings.snapshotEvery) + new EventSourcedEntityDescriptor(componentId, entitySpi) + } + + private val timedActionDescriptors = + componentClasses + .filter(hasComponentId) + .collect { + case clz if classOf[TimedAction].isAssignableFrom(clz) => + val componentId = clz.getAnnotation(classOf[ComponentId]).value + val timedActionClass = clz.asInstanceOf[Class[TimedAction]] + val timedActionSpi = + new TimedActionImpl[TimedAction]( + () => wiredInstance(timedActionClass)(sideEffectingComponentInjects(None)), + timedActionClass, + system.classicSystem, + runtimeComponentClients.timerClient, + sdkExecutionContext, + sdkTracerFactory, + messageCodec) + new TimedActionDescriptor(componentId, timedActionSpi) + } + // these are available for injecting in all kinds of component that are primarily // for side effects // Note: config is also always available through the combination with user DI way down below @@ -375,7 +423,8 @@ private final class Sdk( } val actionAndConsumerServices = services.filter { case (_, service) => - service.getClass == classOf[TimedActionService[_]] || service.getClass == classOf[ConsumerService[_]] + /*FIXME service.getClass == classOf[TimedActionService[_]] ||*/ + service.getClass == classOf[ConsumerService[_]] } if (actionAndConsumerServices.nonEmpty) { @@ -484,11 +533,16 @@ private final class Sdk( override def discovery: Discovery = discoveryEndpoint override def actions: Option[Actions] = actionsEndpoint override def eventSourcedEntities: Option[EventSourcedEntities] = eventSourcedEntitiesEndpoint + override def eventSourcedEntityDescriptors: Seq[EventSourcedEntityDescriptor] = + Sdk.this.eventSourcedEntityDescriptors override def valueEntities: Option[ValueEntities] = valueEntitiesEndpoint override def views: Option[Views] = viewsEndpoint override def workflowEntities: Option[WorkflowEntities] = workflowEntitiesEndpoint - override def replicatedEntities: Option[ReplicatedEntities] = None - override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = httpEndpoints + override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = + Sdk.this.httpEndpointDescriptors + + override def timedActionsDescriptors: Seq[TimedActionDescriptor] = + Sdk.this.timedActionDescriptors } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/client/ComponentClientImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/client/ComponentClientImpl.scala index ca40278a2..f75b1f23b 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/client/ComponentClientImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/client/ComponentClientImpl.scala @@ -35,7 +35,7 @@ private[javasdk] final case class ComponentClientImpl( } override def forTimedAction(): TimedActionClient = - TimedActionClientImpl(runtimeComponentClients.actionClient, callMetadata) + TimedActionClientImpl(runtimeComponentClients.timedActionClient, callMetadata) override def forKeyValueEntity(valueEntityId: String): KeyValueEntityClient = if (valueEntityId eq null) throw new NullPointerException("Key Value entity id is null") diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/client/EntityClientImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/client/EntityClientImpl.scala index 7368d59a7..19a3496c4 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/client/EntityClientImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/client/EntityClientImpl.scala @@ -25,14 +25,14 @@ import akka.javasdk.impl.reflection.Reflect import akka.javasdk.keyvalueentity.KeyValueEntity import akka.javasdk.timedaction.TimedAction import akka.javasdk.workflow.Workflow -import akka.runtime.sdk.spi.ActionRequest +import akka.runtime.sdk.spi.TimedActionRequest import akka.runtime.sdk.spi.ActionType import akka.runtime.sdk.spi.ComponentType import akka.runtime.sdk.spi.EntityRequest import akka.runtime.sdk.spi.EventSourcedEntityType import akka.runtime.sdk.spi.KeyValueEntityType import akka.runtime.sdk.spi.WorkflowType -import akka.runtime.sdk.spi.{ ActionClient => RuntimeActionClient } +import akka.runtime.sdk.spi.{ TimedActionClient => RuntimeTimedActionClient } import akka.runtime.sdk.spi.{ EntityClient => RuntimeEntityClient } import akka.util.ByteString @@ -179,7 +179,7 @@ private[javasdk] final case class WorkflowClientImpl( */ @InternalApi private[javasdk] final case class TimedActionClientImpl( - actionClient: RuntimeActionClient, + timedActionClient: RuntimeTimedActionClient, callMetadata: Option[Metadata])(implicit val executionContext: ExecutionContext) extends TimedActionClient { override def method[T, R](methodRef: function.Function[T, TimedAction.Effect]): ComponentDeferredMethodRef[R] = @@ -219,9 +219,9 @@ private[javasdk] final case class TimedActionClientImpl( methodName, None, { metadata => - actionClient + timedActionClient .call( - new ActionRequest( + new TimedActionRequest( componentId, methodName, ContentTypes.`application/json`, diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala index 3412e5371..64111dbbd 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala @@ -290,6 +290,7 @@ private[impl] final class EventSourcedEntitiesImpl( with CommandContext with ActivatableContext { override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) + override def isDeleted: Boolean = false // FIXME not supported by old spi } private class EventSourcedEntityContextImpl(override final val entityId: String) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala new file mode 100644 index 000000000..fde399f93 --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -0,0 +1,292 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.eventsourcedentity + +import java.util.Optional + +import scala.concurrent.Future +import scala.util.control.NonFatal + +import akka.annotation.InternalApi +import akka.javasdk.Metadata +import akka.javasdk.Tracing +import akka.javasdk.eventsourcedentity.CommandContext +import akka.javasdk.eventsourcedentity.EventContext +import akka.javasdk.eventsourcedentity.EventSourcedEntity +import akka.javasdk.eventsourcedentity.EventSourcedEntityContext +import akka.javasdk.impl.AbstractContext +import akka.javasdk.impl.ActivatableContext +import akka.javasdk.impl.AnySupport +import akka.javasdk.impl.ComponentDescriptor +import akka.javasdk.impl.EntityExceptions +import akka.javasdk.impl.EntityExceptions.EntityException +import akka.javasdk.impl.ErrorHandling.BadRequestException +import akka.javasdk.impl.JsonMessageCodec +import akka.javasdk.impl.MetadataImpl +import akka.javasdk.impl.Settings +import akka.javasdk.impl.effect.ErrorReplyImpl +import akka.javasdk.impl.effect.MessageReplyImpl +import akka.javasdk.impl.effect.NoSecondaryEffectImpl +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.EmitEvents +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.NoPrimaryEffect +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityRouter.CommandHandlerNotFound +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityRouter.EventHandlerNotFound +import akka.javasdk.impl.telemetry.SpanTracingImpl +import akka.javasdk.impl.telemetry.Telemetry +import akka.runtime.sdk.spi.SpiEntity +import akka.runtime.sdk.spi.SpiEventSourcedEntity +import akka.runtime.sdk.spi.SpiSerialization +import akka.runtime.sdk.spi.SpiSerialization.Deserialized +import com.google.protobuf.ByteString +import com.google.protobuf.any.{ Any => ScalaPbAny } +import io.grpc.Status +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.Tracer +import org.slf4j.LoggerFactory +import org.slf4j.MDC + +/** + * INTERNAL API + */ +@InternalApi +private[impl] object EventSourcedEntityImpl { + private val log = LoggerFactory.getLogger(this.getClass) + + private class CommandContextImpl( + override val entityId: String, + override val sequenceNumber: Long, + override val commandName: String, + override val commandId: Long, // FIXME remove + override val isDeleted: Boolean, + override val metadata: Metadata, + span: Option[Span], + tracerFactory: () => Tracer) + extends AbstractContext + with CommandContext + with ActivatableContext { + override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) + } + + private class EventSourcedEntityContextImpl(override final val entityId: String) + extends AbstractContext + with EventSourcedEntityContext + + private final class EventContextImpl(entityId: String, override val sequenceNumber: Long) + extends EventSourcedEntityContextImpl(entityId) + with EventContext +} + +/** + * INTERNAL API + */ +@InternalApi +private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[S, E]]( + configuration: Settings, + tracerFactory: () => Tracer, + componentId: String, + componentClass: Class[_], + messageCodec: JsonMessageCodec, + factory: EventSourcedEntityContext => ES, + snapshotEvery: Int) + extends SpiEventSourcedEntity { + import EventSourcedEntityImpl._ + + if (snapshotEvery < 0) + log.warn("Snapshotting disabled for entity [{}], this is not recommended.", componentId) + + // FIXME +// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) + + private val componentDescriptor = ComponentDescriptor.descriptorFor(componentClass, messageCodec) + + // FIXME remove EventSourcedEntityRouter altogether, and only keep stateless ReflectiveEventSourcedEntityRouter + private def createRouter(context: EventSourcedEntityContext) + : ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = + new ReflectiveEventSourcedEntityRouter[S, E, ES]( + factory(context), + componentDescriptor.commandHandlers, + messageCodec) + .asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]] + + override def emptyState: SpiEventSourcedEntity.State = { + // FIXME rather messy with the contexts here + val context = new EventSourcedEntityContextImpl("FIXME_ID") + val router = createRouter(context) + try { + router.entity.emptyState() + } finally { + router.entity._internalSetCommandContext(Optional.empty()) + } + } + + override def handleCommand( + state: SpiEventSourcedEntity.State, + command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = { + val entityId = command.entityId + + val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command) + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) + val cmd = + messageCodec.decodeMessage( + command.payload.getOrElse( + // FIXME smuggling 0 arity method called from component client through here + ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))) + val metadata: Metadata = + MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + val cmdContext = + new CommandContextImpl( + entityId, + command.sequenceNumber, + command.name, + 0, + command.isDeleted, + metadata, + span, + tracerFactory) + + val context = new EventSourcedEntityContextImpl(entityId) + val router = createRouter(context) + router.entity._internalSetCommandContext(Optional.of(cmdContext)) + try { + router.entity._internalSetCurrentState(state) + val commandEffect = router + .handleCommand(command.name, state, cmd, cmdContext) + .asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve? + + def replyOrError(updatedState: SpiEventSourcedEntity.State): (Option[ScalaPbAny], Option[SpiEntity.Error]) = { + commandEffect.secondaryEffect(updatedState) match { + case ErrorReplyImpl(description, status) => + val errorCode = status.map(_.value).getOrElse(Status.Code.UNKNOWN.value) + (None, Some(new SpiEntity.Error(description, errorCode))) + case MessageReplyImpl(message, _) => + // FIXME metadata? + // FIXME is this encoding correct? + val replyPayload = ScalaPbAny.fromJavaProto(messageCodec.encodeJava(message)) + (Some(replyPayload), None) + case NoSecondaryEffectImpl => + (None, None) + } + } + + var currentSequence = command.sequenceNumber + var updatedState = state + commandEffect.primaryEffect match { + case EmitEvents(events, deleteEntity) => + var shouldSnapshot = false + events.foreach { event => + updatedState = entityHandleEvent(updatedState, event.asInstanceOf[AnyRef], entityId, currentSequence) + if (updatedState == null) + throw new IllegalArgumentException("Event handler must not return null as the updated state.") + currentSequence += 1 + shouldSnapshot = shouldSnapshot || (snapshotEvery > 0 && currentSequence % snapshotEvery == 0) + } + + val (reply, error) = replyOrError(updatedState) + + if (error.isDefined) { + Future.successful( + new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None)) + } else { + // snapshotting final state since that is the "atomic" write + // emptyState can be null but null snapshot should not be stored, but that can't even + // happen since event handler is not allowed to return null as newState + // FIXME +// val snapshot = +// if (shouldSnapshot) Option(updatedState) +// else None + + val delete = + if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter) + else None + + val serializedEvents = + events.map(event => ScalaPbAny.fromJavaProto(messageCodec.encodeJava(event))).toVector + + Future.successful( + new SpiEventSourcedEntity.Effect(events = serializedEvents, updatedState = state, reply, error, delete)) + } + + case NoPrimaryEffect => + val (reply, error) = replyOrError(updatedState) + + Future.successful( + new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply, error, None)) + } + + } catch { + case CommandHandlerNotFound(name) => + throw new EntityExceptions.EntityException( + entityId, + 0, // FIXME remove commandId + command.name, + s"No command handler found for command [$name] on ${router.entity.getClass}") + case BadRequestException(msg) => + Future.successful( + new SpiEventSourcedEntity.Effect( + events = Vector.empty, + updatedState = state, + reply = None, + error = Some(new SpiEntity.Error(msg, Status.Code.INVALID_ARGUMENT.value)), + delete = None)) + case e: EntityException => + throw e + case NonFatal(error) => + throw EntityException( + entityId = entityId, + commandId = 0, + commandName = command.name, + s"Unexpected failure: $error", + Some(error)) + } finally { + router.entity._internalSetCommandContext(Optional.empty()) + router.entity._internalClearCurrentState() + cmdContext.deactivate() // Very important! + + span.foreach { s => + MDC.remove(Telemetry.TRACE_ID) + s.end() + } + } + + } + + override def handleEvent( + state: SpiEventSourcedEntity.State, + eventEnv: SpiEventSourcedEntity.EventEnvelope): SpiEventSourcedEntity.State = { + val event = + messageCodec + .decodeMessage(eventEnv.payload) + .asInstanceOf[AnyRef] // FIXME empty? + entityHandleEvent(state, event, eventEnv.entityId, eventEnv.sequenceNumber) + } + + def entityHandleEvent( + state: SpiEventSourcedEntity.State, + event: AnyRef, + entityId: String, + sequenceNumber: Long): SpiEventSourcedEntity.State = { + val eventContext = new EventContextImpl(entityId, sequenceNumber) + val router = createRouter(eventContext) // FIXME reuse router instance? + router.entity._internalSetEventContext(Optional.of(eventContext)) + try { + router.handleEvent(state, event) + } catch { + case EventHandlerNotFound(eventClass) => + throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${router.entity.getClass}") + } finally { + router.entity._internalSetEventContext(Optional.empty()) + } + } + + override val stateSerializer: SpiSerialization.Serializer = + new SpiSerialization.Serializer { + + override def toProto(obj: Deserialized): ScalaPbAny = + ScalaPbAny.fromJavaProto(messageCodec.encodeJava(obj)) + + override def fromProto(pb: ScalaPbAny): Deserialized = + messageCodec.decodeMessage(pb).asInstanceOf[Deserialized] + } +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala index 788588442..d92629384 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala @@ -5,7 +5,6 @@ package akka.javasdk.impl.eventsourcedentity import akka.annotation.InternalApi -import akka.javasdk.JsonSupport import akka.javasdk.eventsourcedentity.CommandContext import akka.javasdk.eventsourcedentity.EventSourcedEntity import akka.javasdk.impl.AnySupport @@ -22,7 +21,7 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } */ @InternalApi private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( - override protected val entity: ES, + override val entity: ES, commandHandlers: Map[String, CommandHandler], messageCodec: JsonMessageCodec) extends EventSourcedEntityRouter[S, E, ES](entity) { @@ -39,7 +38,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE override def handleEvent(state: S, event: E): S = { - _extractAndSetCurrentState(state) + _setCurrentState(state) event match { case anyPb: ScalaPbAny => // replaying event coming from runtime @@ -60,7 +59,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE command: Any, commandContext: CommandContext): EventSourcedEntity.Effect[_] = { - _extractAndSetCurrentState(state) + _setCurrentState(state) val commandHandler = commandHandlerLookup(commandName) @@ -90,7 +89,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE } } - private def _extractAndSetCurrentState(state: S): Unit = { + private def _setCurrentState(state: S): Unit = { val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(this.entity.getClass).asInstanceOf[Class[S]] // the state: S received can either be of the entity "state" type (if coming from emptyState/memory) @@ -101,9 +100,12 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE // be able to call currentState() later entity._internalSetCurrentState(s) case s => - val deserializedState = - JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) - entity._internalSetCurrentState(deserializedState) + // FIXME this case should not be needed, maybe remove the type check + throw new IllegalArgumentException( + s"Unexpected state type [${s.getClass.getName}], expected [${entityStateType.getName}]") +// val deserializedState = +// JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) +// entity._internalSetCurrentState(deserializedState) } } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala new file mode 100644 index 000000000..49b61af3a --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.timedaction + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.control.NonFatal + +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import akka.javasdk.Metadata +import akka.javasdk.impl.ComponentDescriptor +import akka.javasdk.impl.ErrorHandling +import akka.javasdk.impl.JsonMessageCodec +import akka.javasdk.impl.MessageCodec +import akka.javasdk.impl.MetadataImpl +import akka.javasdk.impl.action.CommandContextImpl +import akka.javasdk.impl.telemetry.Telemetry +import akka.javasdk.impl.timedaction.TimedActionEffectImpl.AsyncEffect +import akka.javasdk.impl.timedaction.TimedActionEffectImpl.ErrorEffect +import akka.javasdk.impl.timedaction.TimedActionEffectImpl.ReplyEffect +import akka.javasdk.timedaction.CommandContext +import akka.javasdk.timedaction.CommandEnvelope +import akka.javasdk.timedaction.TimedAction +import akka.runtime.sdk.spi.SpiTimedAction +import akka.runtime.sdk.spi.SpiTimedAction.Command +import akka.runtime.sdk.spi.SpiTimedAction.Effect +import akka.runtime.sdk.spi.TimerClient +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.Tracer +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.slf4j.MDC + +/** EndMarker */ +@InternalApi +private[impl] final class TimedActionImpl[TA <: TimedAction]( + val factory: () => TA, + timedActionClass: Class[TA], + _system: ActorSystem, + timerClient: TimerClient, + sdkExecutionContext: ExecutionContext, + tracerFactory: () => Tracer, + messageCodec: JsonMessageCodec) + extends SpiTimedAction { + + private val log: Logger = LoggerFactory.getLogger(timedActionClass) + + private implicit val executionContext: ExecutionContext = sdkExecutionContext + implicit val system: ActorSystem = _system + + private val componentDescriptor = ComponentDescriptor.descriptorFor(timedActionClass, messageCodec) + + // FIXME remove router altogether + private def createRouter(): ReflectiveTimedActionRouter[TA] = + new ReflectiveTimedActionRouter[TA](factory(), componentDescriptor.commandHandlers) + + override def handleCommand(command: Command): Future[Effect] = { + val span: Option[Span] = None //FIXME add intrumentation + + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) + val fut = + try { + val messageContext = + createMessageContext(command, messageCodec, span) + val decodedPayload = messageCodec.decodeMessage( + command.payload.getOrElse(throw new IllegalArgumentException("No command payload"))) + val metadata: Metadata = + MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + val effect = createRouter() + .handleUnary(command.name, CommandEnvelope.of(decodedPayload, metadata), messageContext) + toSpiEffect(command, effect) + } catch { + case NonFatal(ex) => + // command handler threw an "unexpected" error + span.foreach(_.end()) + Future.successful(handleUnexpectedException(command, ex)) + } finally { + MDC.remove(Telemetry.TRACE_ID) + } + fut.andThen { case _ => + span.foreach(_.end()) + } + } + + private def createMessageContext(command: Command, messageCodec: MessageCodec, span: Option[Span]): CommandContext = { + val metadata: MetadataImpl = + MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata) + new CommandContextImpl(updatedMetadata, messageCodec, system, timerClient, tracerFactory, span) + } + + private def toSpiEffect(command: Command, effect: TimedAction.Effect): Future[Effect] = { + effect match { + case ReplyEffect(_) => //FIXME remove meta, not used in the reply + Future.successful(new Effect(None)) + case AsyncEffect(futureEffect) => + futureEffect + .flatMap { effect => toSpiEffect(command, effect) } + .recover { case NonFatal(ex) => + handleUnexpectedException(command, ex) + } + case ErrorEffect(description) => + Future.successful(new Effect(Some(new SpiTimedAction.Error(description)))) + case unknown => + throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}") + } + } + + private def handleUnexpectedException(command: Command, ex: Throwable): Effect = { + ex match { + case _ => + ErrorHandling.withCorrelationId { correlationId => + log.error( + s"Failure during handling command [${command.name}] from TimedAction component [${command.componentId}].", + ex) + protocolFailure(correlationId) + } + } + } + + private def protocolFailure(correlationId: String): Effect = { + new Effect(Some(new SpiTimedAction.Error(s"Unexpected error [$correlationId]"))) + } + +} diff --git a/akka-javasdk/src/test/java/akka/javasdk/client/ComponentClientTest.java b/akka-javasdk/src/test/java/akka/javasdk/client/ComponentClientTest.java index 4dcb05178..cca2f2887 100644 --- a/akka-javasdk/src/test/java/akka/javasdk/client/ComponentClientTest.java +++ b/akka-javasdk/src/test/java/akka/javasdk/client/ComponentClientTest.java @@ -16,7 +16,7 @@ import akka.javasdk.impl.client.ComponentClientImpl; import akka.javasdk.impl.client.DeferredCallImpl; import akka.javasdk.impl.telemetry.Telemetry; -import akka.runtime.sdk.spi.ActionClient; +import akka.runtime.sdk.spi.TimedActionClient; import akka.runtime.sdk.spi.ActionType$; import akka.runtime.sdk.spi.ComponentClients; import akka.runtime.sdk.spi.EntityClient; @@ -70,7 +70,7 @@ public ViewClient viewClient() { } @Override - public ActionClient actionClient() { + public TimedActionClient timedActionClient() { return null; } }; diff --git a/akka-javasdk/src/test/scala/akka/javasdk/testkit/TestProtocol.scala b/akka-javasdk/src/test/scala/akka/javasdk/testkit/TestProtocol.scala index 851804c84..1bb2013fb 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/testkit/TestProtocol.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/testkit/TestProtocol.scala @@ -8,7 +8,6 @@ import akka.actor.ActorSystem import akka.grpc.GrpcClientSettings import akka.javasdk.testkit.eventsourcedentity.TestEventSourcedProtocol import akka.javasdk.testkit.keyvalueentity.TestKeyValueEntityProtocol -import akka.javasdk.testkit.replicatedentity.TestReplicatedEntityProtocol import akka.javasdk.testkit.workflow.TestWorkflowProtocol import akka.testkit.TestKit import com.typesafe.config.{ Config, ConfigFactory } @@ -22,7 +21,6 @@ final class TestProtocol(host: String, port: Int) { val eventSourced = new TestEventSourcedProtocol(context) val valueEntity = new TestKeyValueEntityProtocol(context) - val replicatedEntity = new TestReplicatedEntityProtocol(context) val workflow = new TestWorkflowProtocol(context) def settings: GrpcClientSettings = context.clientSettings @@ -30,7 +28,6 @@ final class TestProtocol(host: String, port: Int) { def terminate(): Unit = { eventSourced.terminate() valueEntity.terminate() - replicatedEntity.terminate() workflow.terminate() } } diff --git a/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/ReplicatedEntityMessages.scala b/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/ReplicatedEntityMessages.scala deleted file mode 100644 index b3c55f21c..000000000 --- a/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/ReplicatedEntityMessages.scala +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk.testkit.replicatedentity - -import akka.javasdk.testkit.entity.EntityMessages -import kalix.protocol.replicated_entity._ -import kalix.protocol.component.{ ClientAction, Failure, SideEffect } -import kalix.protocol.entity.Command -import com.google.protobuf.any.{ Any => ScalaPbAny } -import com.google.protobuf.{ Message => JavaPbMessage } -import io.grpc.Status -import scalapb.{ GeneratedMessage => ScalaPbMessage } - -object ReplicatedEntityMessages extends EntityMessages { - import ReplicatedEntityStreamIn.{ Message => InMessage } - import ReplicatedEntityStreamOut.{ Message => OutMessage } - - final case class Effects( - stateAction: Option[ReplicatedEntityStateAction] = None, - sideEffects: Seq[SideEffect] = Seq.empty) { - def withSideEffect(service: String, command: String, message: JavaPbMessage): Effects = - withSideEffect(service, command, messagePayload(message), synchronous = false) - - def withSideEffect(service: String, command: String, message: JavaPbMessage, synchronous: Boolean): Effects = - withSideEffect(service, command, messagePayload(message), synchronous) - - def withSideEffect(service: String, command: String, message: ScalaPbMessage): Effects = - withSideEffect(service, command, messagePayload(message), synchronous = false) - - def withSideEffect(service: String, command: String, message: ScalaPbMessage, synchronous: Boolean): Effects = - withSideEffect(service, command, messagePayload(message), synchronous) - - def withSideEffect(service: String, command: String, payload: Option[ScalaPbAny], synchronous: Boolean): Effects = - copy(sideEffects = sideEffects :+ SideEffect(service, command, payload, synchronous)) - - def ++(other: Effects): Effects = - Effects(stateAction.orElse(other.stateAction), sideEffects ++ other.sideEffects) - } - - object Effects { - val empty: Effects = Effects() - } - - val EmptyInMessage: InMessage = InMessage.Empty - - def init(serviceName: String, entityId: String): InMessage = - init(serviceName, entityId, None) - - def init(serviceName: String, entityId: String, delta: ReplicatedEntityDelta.Delta): InMessage = - InMessage.Init(ReplicatedEntityInit(serviceName, entityId, Option(ReplicatedEntityDelta(delta)))) - - def init(serviceName: String, entityId: String, delta: Option[ReplicatedEntityDelta]): InMessage = - InMessage.Init(ReplicatedEntityInit(serviceName, entityId, delta)) - - def delta(delta: ReplicatedEntityDelta.Delta): InMessage = - InMessage.Delta(ReplicatedEntityDelta(delta)) - - val delete: InMessage = - InMessage.Delete(ReplicatedEntityDelete()) - - def command(id: Long, entityId: String, name: String): InMessage = - command(id, entityId, name, EmptyJavaMessage) - - def command(id: Long, entityId: String, name: String, payload: JavaPbMessage): InMessage = - command(id, entityId, name, messagePayload(payload)) - - def command(id: Long, entityId: String, name: String, payload: ScalaPbMessage): InMessage = - command(id, entityId, name, messagePayload(payload)) - - def command(id: Long, entityId: String, name: String, payload: Option[ScalaPbAny]): InMessage = - InMessage.Command(Command(entityId, id, name, payload)) - - def reply(id: Long, payload: JavaPbMessage): OutMessage = - reply(id, payload, Effects.empty) - - def reply(id: Long, payload: JavaPbMessage, effects: Effects): OutMessage = - reply(id, messagePayload(payload), effects) - - def reply(id: Long, payload: ScalaPbMessage): OutMessage = - reply(id, payload, Effects.empty) - - def reply(id: Long, payload: ScalaPbMessage, effects: Effects): OutMessage = - reply(id, messagePayload(payload), effects) - - def reply(id: Long, payload: Option[ScalaPbAny], effects: Effects): OutMessage = - replicatedEntityReply(id, clientActionReply(payload), effects) - - def forward(id: Long, service: String, command: String, payload: JavaPbMessage): OutMessage = - forward(id, service, command, payload, Effects.empty) - - def forward(id: Long, service: String, command: String, payload: JavaPbMessage, effects: Effects): OutMessage = - forward(id, service, command, messagePayload(payload), effects) - - def forward(id: Long, service: String, command: String, payload: ScalaPbMessage): OutMessage = - forward(id, service, command, payload, Effects.empty) - - def forward(id: Long, service: String, command: String, payload: ScalaPbMessage, effects: Effects): OutMessage = - forward(id, service, command, messagePayload(payload), effects) - - def forward(id: Long, service: String, command: String, payload: Option[ScalaPbAny], effects: Effects): OutMessage = - replicatedEntityReply(id, clientActionForward(service, command, payload), effects) - - def failure(description: String): OutMessage = - failure(id = 0, description) - - def failure(id: Long, description: String): OutMessage = - failure(id, description, Status.Code.UNKNOWN, Effects.empty) - - def failure(id: Long, description: String, statusCode: Status.Code): OutMessage = - failure(id, description, statusCode, Effects.empty) - - def failure(id: Long, description: String, statusCode: Status.Code, effects: Effects): OutMessage = - replicatedEntityReply(id, clientActionFailure(id, description, statusCode.value()), effects) - - def replicatedEntityReply(id: Long, clientAction: Option[ClientAction], effects: Effects): OutMessage = - OutMessage.Reply(ReplicatedEntityReply(id, clientAction, effects.sideEffects, effects.stateAction)) - - def entityFailure(description: String): OutMessage = - entityFailure(id = 0, description) - - def entityFailure(id: Long, description: String): OutMessage = - OutMessage.Failure(Failure(id, description)) - - def replicatedEntityUpdate(delta: ReplicatedEntityDelta.Delta): Option[ReplicatedEntityStateAction] = - Some(ReplicatedEntityStateAction(ReplicatedEntityStateAction.Action.Update(ReplicatedEntityDelta(delta)))) - - def deltaCounter(value: Long): ReplicatedEntityDelta.Delta.Counter = - ReplicatedEntityDelta.Delta.Counter(ReplicatedCounterDelta(value)) - - final case class DeltaReplicatedSet( - cleared: Boolean = false, - removed: Seq[ScalaPbAny] = Seq.empty, - added: Seq[ScalaPbAny] = Seq.empty) { - - def add(element: JavaPbMessage, elements: JavaPbMessage*): DeltaReplicatedSet = - add(protobufAny(element), elements.map(protobufAny): _*) - - def add(element: ScalaPbMessage, elements: ScalaPbMessage*): DeltaReplicatedSet = - add(protobufAny(element), elements.map(protobufAny): _*) - - def add(element: ScalaPbAny, elements: ScalaPbAny*): DeltaReplicatedSet = - add(element +: elements) - - def add(elements: Seq[ScalaPbAny]): DeltaReplicatedSet = - copy(added = added ++ elements) - - def remove(element: JavaPbMessage, elements: JavaPbMessage*): DeltaReplicatedSet = - remove(protobufAny(element), elements.map(protobufAny): _*) - - def remove(element: ScalaPbMessage, elements: ScalaPbMessage*): DeltaReplicatedSet = - remove(protobufAny(element), elements.map(protobufAny): _*) - - def remove(element: ScalaPbAny, elements: ScalaPbAny*): DeltaReplicatedSet = - remove(element +: elements) - - def remove(elements: Seq[ScalaPbAny]): DeltaReplicatedSet = - copy(removed = removed ++ elements) - - def clear(cleared: Boolean = true): DeltaReplicatedSet = - copy(cleared = cleared) - - def replicatedEntityDelta(): ReplicatedEntityDelta.Delta.ReplicatedSet = - ReplicatedEntityDelta.Delta.ReplicatedSet(ReplicatedSetDelta(cleared, removed, added)) - } - - object DeltaReplicatedSet { - val empty: DeltaReplicatedSet = DeltaReplicatedSet() - } - - def deltaRegister(value: JavaPbMessage): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, ReplicatedEntityClock.REPLICATED_ENTITY_CLOCK_DEFAULT_UNSPECIFIED) - - def deltaRegister(value: JavaPbMessage, clock: ReplicatedEntityClock): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, clock, customClock = 0L) - - def deltaRegister( - value: JavaPbMessage, - clock: ReplicatedEntityClock, - customClock: Long): ReplicatedEntityDelta.Delta.Register = - deltaRegister(messagePayload(value), clock, customClock) - - def deltaRegister(value: ScalaPbMessage): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, ReplicatedEntityClock.REPLICATED_ENTITY_CLOCK_DEFAULT_UNSPECIFIED) - - def deltaRegister(value: ScalaPbMessage, clock: ReplicatedEntityClock): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, clock, customClock = 0L) - - def deltaRegister( - value: ScalaPbMessage, - clock: ReplicatedEntityClock, - customClock: Long): ReplicatedEntityDelta.Delta.Register = - deltaRegister(messagePayload(value), clock, customClock) - - def deltaRegister(value: Option[ScalaPbAny]): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, ReplicatedEntityClock.REPLICATED_ENTITY_CLOCK_DEFAULT_UNSPECIFIED) - - def deltaRegister(value: Option[ScalaPbAny], clock: ReplicatedEntityClock): ReplicatedEntityDelta.Delta.Register = - deltaRegister(value, clock, customClock = 0L) - - def deltaRegister( - value: Option[ScalaPbAny], - clock: ReplicatedEntityClock, - customClock: Long): ReplicatedEntityDelta.Delta.Register = - ReplicatedEntityDelta.Delta.Register(ReplicatedRegisterDelta(value, clock, customClock)) - - final case class DeltaMap( - cleared: Boolean = false, - removed: Seq[ScalaPbAny] = Seq.empty, - updated: Seq[(ScalaPbAny, ReplicatedEntityDelta)] = Seq.empty, - added: Seq[(ScalaPbAny, ReplicatedEntityDelta)] = Seq.empty) { - - def add(key: JavaPbMessage, delta: ReplicatedEntityDelta): DeltaMap = - add(protobufAny(key), delta) - - def add(key: ScalaPbMessage, delta: ReplicatedEntityDelta): DeltaMap = - add(protobufAny(key), delta) - - def add(key: ScalaPbAny, delta: ReplicatedEntityDelta): DeltaMap = - add(Seq(key -> delta)) - - def add(entries: Seq[(ScalaPbAny, ReplicatedEntityDelta)]): DeltaMap = - copy(added = added ++ entries) - - def update(key: JavaPbMessage, delta: ReplicatedEntityDelta): DeltaMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbMessage, delta: ReplicatedEntityDelta): DeltaMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbAny, delta: ReplicatedEntityDelta): DeltaMap = - update(Seq(key -> delta)) - - def update(entries: Seq[(ScalaPbAny, ReplicatedEntityDelta)]): DeltaMap = - copy(updated = updated ++ entries) - - def remove(key: JavaPbMessage, keys: JavaPbMessage*): DeltaMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbMessage, keys: ScalaPbMessage*): DeltaMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbAny, keys: ScalaPbAny*): DeltaMap = - remove(key +: keys) - - def remove(keys: Seq[ScalaPbAny]): DeltaMap = - copy(removed = removed ++ keys) - - def clear(cleared: Boolean = true): DeltaMap = - copy(cleared = cleared) - - def replicatedEntityDelta(): ReplicatedEntityDelta.Delta.ReplicatedMap = { - val updatedEntries = updated.map { case (key, delta) => ReplicatedMapEntryDelta(Option(key), Option(delta)) } - val addedEntries = added.map { case (key, delta) => ReplicatedMapEntryDelta(Option(key), Option(delta)) } - ReplicatedEntityDelta.Delta.ReplicatedMap(ReplicatedMapDelta(cleared, removed, updatedEntries, addedEntries)) - } - } - - object DeltaMap { - val empty: DeltaMap = DeltaMap() - } - - final case class DeltaCounterMap( - cleared: Boolean = false, - removed: Seq[ScalaPbAny] = Seq.empty, - updated: Seq[(ScalaPbAny, ReplicatedCounterDelta)] = Seq.empty) { - - def update(key: JavaPbMessage, delta: ReplicatedCounterDelta): DeltaCounterMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbMessage, delta: ReplicatedCounterDelta): DeltaCounterMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbAny, delta: ReplicatedCounterDelta): DeltaCounterMap = - update(Seq(key -> delta)) - - def update(entries: Seq[(ScalaPbAny, ReplicatedCounterDelta)]): DeltaCounterMap = - copy(updated = updated ++ entries) - - def remove(key: JavaPbMessage, keys: JavaPbMessage*): DeltaCounterMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbMessage, keys: ScalaPbMessage*): DeltaCounterMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbAny, keys: ScalaPbAny*): DeltaCounterMap = - remove(key +: keys) - - def remove(keys: Seq[ScalaPbAny]): DeltaCounterMap = - copy(removed = removed ++ keys) - - def clear(cleared: Boolean = true): DeltaCounterMap = - copy(cleared = cleared) - - def replicatedEntityDelta(): ReplicatedEntityDelta.Delta.ReplicatedCounterMap = { - val updatedEntries = updated.map { case (key, delta) => - ReplicatedCounterMapEntryDelta(Option(key), Option(delta)) - } - ReplicatedEntityDelta.Delta.ReplicatedCounterMap(ReplicatedCounterMapDelta(cleared, removed, updatedEntries)) - } - } - - object DeltaCounterMap { - val empty: DeltaCounterMap = DeltaCounterMap() - } - - final case class DeltaRegisterMap( - cleared: Boolean = false, - removed: Seq[ScalaPbAny] = Seq.empty, - updated: Seq[(ScalaPbAny, ReplicatedRegisterDelta)] = Seq.empty) { - - def update(key: JavaPbMessage, delta: ReplicatedRegisterDelta): DeltaRegisterMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbMessage, delta: ReplicatedRegisterDelta): DeltaRegisterMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbAny, delta: ReplicatedRegisterDelta): DeltaRegisterMap = - update(Seq(key -> delta)) - - def update(entries: Seq[(ScalaPbAny, ReplicatedRegisterDelta)]): DeltaRegisterMap = - copy(updated = updated ++ entries) - - def remove(key: JavaPbMessage, keys: JavaPbMessage*): DeltaRegisterMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbMessage, keys: ScalaPbMessage*): DeltaRegisterMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbAny, keys: ScalaPbAny*): DeltaRegisterMap = - remove(key +: keys) - - def remove(keys: Seq[ScalaPbAny]): DeltaRegisterMap = - copy(removed = removed ++ keys) - - def clear(cleared: Boolean = true): DeltaRegisterMap = - copy(cleared = cleared) - - def replicatedEntityDelta(): ReplicatedEntityDelta.Delta.ReplicatedRegisterMap = { - val updatedEntries = updated.map { case (key, delta) => - ReplicatedRegisterMapEntryDelta(Option(key), Option(delta)) - } - ReplicatedEntityDelta.Delta.ReplicatedRegisterMap(ReplicatedRegisterMapDelta(cleared, removed, updatedEntries)) - } - } - - object DeltaRegisterMap { - val empty: DeltaRegisterMap = DeltaRegisterMap() - } - - final case class DeltaMultiMap( - cleared: Boolean = false, - removed: Seq[ScalaPbAny] = Seq.empty, - updated: Seq[(ScalaPbAny, ReplicatedSetDelta)] = Seq.empty) { - - def update(key: JavaPbMessage, delta: ReplicatedSetDelta): DeltaMultiMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbMessage, delta: ReplicatedSetDelta): DeltaMultiMap = - update(protobufAny(key), delta) - - def update(key: ScalaPbAny, delta: ReplicatedSetDelta): DeltaMultiMap = - update(Seq(key -> delta)) - - def update(entries: Seq[(ScalaPbAny, ReplicatedSetDelta)]): DeltaMultiMap = - copy(updated = updated ++ entries) - - def remove(key: JavaPbMessage, keys: JavaPbMessage*): DeltaMultiMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbMessage, keys: ScalaPbMessage*): DeltaMultiMap = - remove(protobufAny(key), keys.map(protobufAny): _*) - - def remove(key: ScalaPbAny, keys: ScalaPbAny*): DeltaMultiMap = - remove(key +: keys) - - def remove(keys: Seq[ScalaPbAny]): DeltaMultiMap = - copy(removed = removed ++ keys) - - def clear(cleared: Boolean = true): DeltaMultiMap = - copy(cleared = cleared) - - def replicatedEntityDelta(): ReplicatedEntityDelta.Delta.ReplicatedMultiMap = { - val updatedEntries = updated.map { case (key, delta) => ReplicatedMultiMapEntryDelta(Option(key), Option(delta)) } - ReplicatedEntityDelta.Delta.ReplicatedMultiMap(ReplicatedMultiMapDelta(cleared, removed, updatedEntries)) - } - } - - object DeltaMultiMap { - val empty: DeltaMultiMap = DeltaMultiMap() - } - - def deltaVote(selfVote: Boolean, votesFor: Int = 0, totalVoters: Int = 0): ReplicatedEntityDelta.Delta.Vote = - ReplicatedEntityDelta.Delta.Vote(VoteDelta(selfVote, votesFor, totalVoters)) - - val replicatedEntityDelete: Option[ReplicatedEntityStateAction] = - Some(ReplicatedEntityStateAction(ReplicatedEntityStateAction.Action.Delete(ReplicatedEntityDelete()))) -} diff --git a/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/TestReplicatedEntityProtocol.scala b/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/TestReplicatedEntityProtocol.scala deleted file mode 100644 index 850b6d24a..000000000 --- a/akka-javasdk/src/test/scala/akka/javasdk/testkit/replicatedentity/TestReplicatedEntityProtocol.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk.testkit.replicatedentity - -import akka.stream.scaladsl.Source -import akka.stream.testkit.TestPublisher -import akka.stream.testkit.scaladsl.TestSink -import kalix.protocol.replicated_entity._ -import akka.javasdk.testkit.TestProtocol.TestProtocolContext - -final class TestReplicatedEntityProtocol(context: TestProtocolContext) { - private val client = ReplicatedEntitiesClient(context.clientSettings)(context.system) - - def connect(): TestReplicatedEntityProtocol.Connection = - new TestReplicatedEntityProtocol.Connection(client, context) - - def terminate(): Unit = client.close() -} - -object TestReplicatedEntityProtocol { - final class Connection(client: ReplicatedEntitiesClient, context: TestProtocolContext) { - - import context.system - - private val in = TestPublisher.probe[ReplicatedEntityStreamIn]() - private val out = client.handle(Source.fromPublisher(in)).runWith(TestSink[ReplicatedEntityStreamOut]()) - - out.ensureSubscription() - - def send(message: ReplicatedEntityStreamIn.Message): Connection = { - in.sendNext(ReplicatedEntityStreamIn(message)) - this - } - - def expect(message: ReplicatedEntityStreamOut.Message): Connection = { - out.request(1).expectNext(ReplicatedEntityStreamOut(message)) - this - } - - def expectNext(): ReplicatedEntityStreamOut.Message = { - out.request(1).expectNext().message - } - - def expectClosed(): Unit = { - out.expectComplete() - in.expectCancellation() - } - - def expectEntityFailure(descStartingWith: String): Connection = { - expectNext() match { - case m: ReplicatedEntityStreamOut.Message => - if (m.failure.exists(_.description.startsWith(descStartingWith))) this - else - throw new RuntimeException(s"Expected failure starting with [$descStartingWith] but got $m") - } - } - - def passivate(): Unit = close() - - def close(): Unit = { - in.sendComplete() - out.expectComplete() - } - } -} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9e8ccfb18..65ba1896b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this - val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.2.2") + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94") } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned diff --git a/publishLocally.sh b/publishLocally.sh index aa973c3cd..d3123c4a6 100755 --- a/publishLocally.sh +++ b/publishLocally.sh @@ -1,6 +1,7 @@ # This script will publish the current snapshot of all artifacts. # Including the maven plugin and archetypes. +set -e export SDK_VERSION=$(sbt "print akka-javasdk/version" | tail -1) echo @@ -15,8 +16,8 @@ sbt 'publishM2; +publishLocal' mvn clean install -Dskip.docker=true # cleanup - rm pom.xml.versionsBackup - rm */pom.xml.versionsBackup + rm -f pom.xml.versionsBackup + rm -f */pom.xml.versionsBackup # revert, but only we didn't request to keep the modified files if [ "$1" != "--keep" ]; then