From 8df58ebe50faf3c9915d798432a882880ea5e7d7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Dec 2024 16:16:40 +0100 Subject: [PATCH] chore: Snapshots and StateSerializer type * test that verifies snapshots * pass snapshotEvery in SpiSettings * use entityStateType when deserializing state (snapshot) --- .../akka-javasdk-parent/pom.xml | 2 +- .../akkajavasdk/EventSourcedEntityTest.java | 11 ++++++++- .../src/test/resources/application.conf | 1 - .../src/test/resources/logback-test.xml | 3 ++- .../akka/javasdk/impl/JsonMessageCodec.scala | 5 ++-- .../scala/akka/javasdk/impl/SdkRunner.scala | 8 ++++--- .../scala/akka/javasdk/impl/Settings.scala | 2 -- .../EventSourcedEntitiesImpl.scala | 2 +- .../EventSourcedEntityImpl.scala | 24 ++++--------------- .../ReflectiveEventSourcedEntityRouter.scala | 3 ++- project/Dependencies.scala | 2 +- 11 files changed, 28 insertions(+), 35 deletions(-) diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index bd6ca5abd..58c7537d4 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,7 @@ 21 - 1.3.0-2909c94-1-37f8654f-SNAPSHOT + 1.3.0-2909c94-1-e56551b7-SNAPSHOT UTF-8 false diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java index 78abdd8d4..ce7785251 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java @@ -5,12 +5,14 @@ package akkajavasdk; import akka.javasdk.http.StrictResponse; +import akka.javasdk.testkit.TestKit; import akka.javasdk.testkit.TestKitSupport; import akkajavasdk.components.eventsourcedentities.counter.Counter; import akkajavasdk.components.eventsourcedentities.counter.CounterEntity; import akka.javasdk.client.EventSourcedEntityClient; import akkajavasdk.components.eventsourcedentities.hierarchy.AbstractTextConsumer; import akkajavasdk.components.eventsourcedentities.hierarchy.TextEsEntity; +import com.typesafe.config.ConfigFactory; import org.awaitility.Awaitility; import org.hamcrest.core.IsEqual; import org.junit.jupiter.api.Assertions; @@ -30,6 +32,13 @@ @ExtendWith(Junit5LogCapturing.class) public class EventSourcedEntityTest extends TestKitSupport { + @Override + protected TestKit.Settings testKitSettings() { + return TestKit.Settings.DEFAULT.withAdditionalConfig(ConfigFactory.parseString(""" + akka.javasdk.event-sourced-entity.snapshot-every = 10 + """)); + } + @Test public void verifyCounterEventSourcedWiring() throws InterruptedException { @@ -144,7 +153,7 @@ public void verifyCounterEventSourcedAfterRestart() { @Test public void verifyCounterEventSourcedAfterRestartFromSnapshot() { - // snapshotting with kalix.event-sourced-entity.snapshot-every = 10 + // snapshotting with akka.javasdk.event-sourced-entity.snapshot-every = 10 var counterId = "restartFromSnapshot"; var client = componentClient.forEventSourcedEntity(counterId); diff --git a/akka-javasdk-tests/src/test/resources/application.conf b/akka-javasdk-tests/src/test/resources/application.conf index 756fae747..e7446a237 100644 --- a/akka-javasdk-tests/src/test/resources/application.conf +++ b/akka-javasdk-tests/src/test/resources/application.conf @@ -1,3 +1,2 @@ -kalix.event-sourced-entity.snapshot-every = 10 # Using a different port to not conflict with parallel tests akka.javasdk.testkit.http-port = 39391 diff --git a/akka-javasdk-tests/src/test/resources/logback-test.xml b/akka-javasdk-tests/src/test/resources/logback-test.xml index 2eaa38a2a..1e7cd469f 100644 --- a/akka-javasdk-tests/src/test/resources/logback-test.xml +++ b/akka-javasdk-tests/src/test/resources/logback-test.xml @@ -13,6 +13,7 @@ + @@ -23,6 +24,6 @@ - + diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/JsonMessageCodec.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/JsonMessageCodec.scala index 91218c4c7..077476ed5 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/JsonMessageCodec.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/JsonMessageCodec.scala @@ -165,9 +165,8 @@ private[javasdk] class JsonMessageCodec extends MessageCodec { value } - def decodeMessage[T](expectedType: Class[T], bytes: akka.util.ByteString): T = { - // FIXME could we avoid the copy? - JsonSupport.parseBytes(bytes.toArrayUnsafe(), expectedType) + def decodeMessage[T](expectedType: Class[T], pb: ScalaPbAny): T = { + JsonSupport.decodeJson(expectedType, pb) } private[akka] def removeVersion(typeName: String) = { diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index da73e1c90..98a2ab7c5 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -118,6 +118,9 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends @nowarn("msg=deprecated") //TODO remove deprecation once we remove the old constructor override def getSettings: SpiSettings = { val applicationConf = applicationConfig + + val eventSourcedEntitySnapshotEvery = applicationConfig.getInt("akka.javasdk.event-sourced-entity.snapshot-every") + val devModeSettings = if (applicationConf.getBoolean("akka.javasdk.dev-mode.enabled")) Some( @@ -132,7 +135,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends else None - new SpiSettings(devModeSettings) + new SpiSettings(eventSourcedEntitySnapshotEvery, devModeSettings) } private def extractBrokerConfig(eventingConf: Config): SpiEventingSupportSettings = { @@ -374,8 +377,7 @@ private final class Sdk( wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) { // remember to update component type API doc and docs if changing the set of injectables case p if p == classOf[EventSourcedEntityContext] => context - }, - sdkSettings.snapshotEvery) + }) } new EventSourcedEntityDescriptor(componentId, instanceFactory) } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala index 9bb9e02fa..19617081b 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala @@ -18,7 +18,6 @@ private[impl] object Settings { def apply(sdkConfig: Config): Settings = { Settings( - snapshotEvery = sdkConfig.getInt("event-sourced-entity.snapshot-every"), cleanupDeletedEventSourcedEntityAfter = sdkConfig.getDuration("event-sourced-entity.cleanup-deleted-after"), cleanupDeletedKeyValueEntityAfter = sdkConfig.getDuration("key-value-entity.cleanup-deleted-after"), devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))( @@ -35,7 +34,6 @@ private[impl] object Settings { */ @InternalApi private[impl] final case class Settings( - snapshotEvery: Int, cleanupDeletedEventSourcedEntityAfter: Duration, cleanupDeletedKeyValueEntityAfter: Duration, devModeSettings: Option[DevModeSettings]) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala index 64111dbbd..28def0d8a 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala @@ -91,7 +91,7 @@ private[impl] final class EventSourcedEntitiesImpl( if (service.snapshotEvery < 0) log.warn("Snapshotting disabled for entity [{}], this is not recommended.", service.componentId) // FIXME overlay configuration provided by _system - (name, if (service.snapshotEvery == 0) service.withSnapshotEvery(configuration.snapshotEvery) else service) + (name, if (service.snapshotEvery == 0) service else service) }.toMap private val instrumentations: Map[String, TraceInstrumentation] = services.values.map { s => diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala index 79e056081..1a17d6f8b 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -44,7 +44,6 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } import io.grpc.Status import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer -import org.slf4j.LoggerFactory import org.slf4j.MDC /** @@ -52,7 +51,6 @@ import org.slf4j.MDC */ @InternalApi private[impl] object EventSourcedEntityImpl { - private val log = LoggerFactory.getLogger(this.getClass) private class CommandContextImpl( override val entityId: String, @@ -89,14 +87,10 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ componentClass: Class[_], entityId: String, messageCodec: JsonMessageCodec, - factory: EventSourcedEntityContext => ES, - snapshotEvery: Int) + factory: EventSourcedEntityContext => ES) extends SpiEventSourcedEntity { import EventSourcedEntityImpl._ - if (snapshotEvery < 0) - log.warn("Snapshotting disabled for entity [{}], this is not recommended.", componentId) - // FIXME // private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) @@ -130,7 +124,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ // FIXME smuggling 0 arity method called from component client through here ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))) val metadata: Metadata = - MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + MetadataImpl.Empty // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) val cmdContext = new CommandContextImpl( entityId, @@ -168,13 +162,11 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ var updatedState = state commandEffect.primaryEffect match { case EmitEvents(events, deleteEntity) => - var shouldSnapshot = false events.foreach { event => updatedState = entityHandleEvent(updatedState, event.asInstanceOf[AnyRef], entityId, currentSequence) if (updatedState == null) throw new IllegalArgumentException("Event handler must not return null as the updated state.") currentSequence += 1 - shouldSnapshot = shouldSnapshot || (snapshotEvery > 0 && currentSequence % snapshotEvery == 0) } val (reply, error) = replyOrError(updatedState) @@ -183,14 +175,6 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ Future.successful( new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None)) } else { - // snapshotting final state since that is the "atomic" write - // emptyState can be null but null snapshot should not be stored, but that can't even - // happen since event handler is not allowed to return null as newState - // FIXME -// val snapshot = -// if (shouldSnapshot) Option(updatedState) -// else None - val delete = if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter) else None @@ -279,7 +263,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ override def toProto(obj: Deserialized): ScalaPbAny = ScalaPbAny.fromJavaProto(messageCodec.encodeJava(obj)) - override def fromProto(pb: ScalaPbAny): Deserialized = - messageCodec.decodeMessage(pb).asInstanceOf[Deserialized] + def fromProto(pb: ScalaPbAny): Deserialized = + messageCodec.decodeMessage(router.entityStateType, pb) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala index d92629384..c7101bcff 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala @@ -31,6 +31,8 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE // similar to workflow, we preemptively register the events type to the message codec Reflect.allKnownEventTypes[S, E, ES](entity).foreach(messageCodec.registerTypeHints) + val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(entity.getClass).asInstanceOf[Class[S]] + private def commandHandlerLookup(commandName: String) = commandHandlers.getOrElse( commandName, @@ -90,7 +92,6 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE } private def _setCurrentState(state: S): Unit = { - val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(this.entity.getClass).asInstanceOf[Class[S]] // the state: S received can either be of the entity "state" type (if coming from emptyState/memory) // or PB Any type (if coming from the runtime) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9bb0ca6ee..a3e6618b4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this - val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-37f8654f-SNAPSHOT") + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-e56551b7-SNAPSHOT") } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned