diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index 460f15041..bd6ca5abd 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,7 @@ 21 - 1.3.0-2909c94 + 1.3.0-2909c94-1-37f8654f-SNAPSHOT UTF-8 false diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index ff025888f..da73e1c90 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -96,6 +96,7 @@ import scala.jdk.CollectionConverters._ import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl import akka.javasdk.impl.timedaction.TimedActionImpl import akka.runtime.sdk.spi.EventSourcedEntityDescriptor +import akka.runtime.sdk.spi.SpiEventSourcedEntity import akka.runtime.sdk.spi.TimedActionDescriptor /** @@ -361,12 +362,13 @@ private final class Sdk( .collect { case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) => val componentId = clz.getAnnotation(classOf[ComponentId]).value - val entitySpi = + val instanceFactory: SpiEventSourcedEntity.FactoryContext => SpiEventSourcedEntity = { factoryContext => new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]( sdkSettings, sdkTracerFactory, componentId, clz, + factoryContext.entityId, messageCodec, context => wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) { @@ -374,7 +376,8 @@ private final class Sdk( case p if p == classOf[EventSourcedEntityContext] => context }, sdkSettings.snapshotEvery) - new EventSourcedEntityDescriptor(componentId, entitySpi) + } + new EventSourcedEntityDescriptor(componentId, instanceFactory) } private val timedActionDescriptors = diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala index fde399f93..79e056081 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -87,6 +87,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ tracerFactory: () => Tracer, componentId: String, componentClass: Class[_], + entityId: String, messageCodec: JsonMessageCodec, factory: EventSourcedEntityContext => ES, snapshotEvery: Int) @@ -102,29 +103,24 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ private val componentDescriptor = ComponentDescriptor.descriptorFor(componentClass, messageCodec) // FIXME remove EventSourcedEntityRouter altogether, and only keep stateless ReflectiveEventSourcedEntityRouter - private def createRouter(context: EventSourcedEntityContext) - : ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = + private val router: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = { + val context = new EventSourcedEntityContextImpl(entityId) new ReflectiveEventSourcedEntityRouter[S, E, ES]( factory(context), componentDescriptor.commandHandlers, messageCodec) .asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]] - - override def emptyState: SpiEventSourcedEntity.State = { - // FIXME rather messy with the contexts here - val context = new EventSourcedEntityContextImpl("FIXME_ID") - val router = createRouter(context) - try { - router.entity.emptyState() - } finally { - router.entity._internalSetCommandContext(Optional.empty()) - } } + private def entity: EventSourcedEntity[AnyRef, AnyRef] = + router.entity + + override def emptyState: SpiEventSourcedEntity.State = + entity.emptyState() + override def handleCommand( state: SpiEventSourcedEntity.State, command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = { - val entityId = command.entityId val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command) span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) @@ -146,11 +142,9 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ span, tracerFactory) - val context = new EventSourcedEntityContextImpl(entityId) - val router = createRouter(context) - router.entity._internalSetCommandContext(Optional.of(cmdContext)) + entity._internalSetCommandContext(Optional.of(cmdContext)) try { - router.entity._internalSetCurrentState(state) + entity._internalSetCurrentState(state) val commandEffect = router .handleCommand(command.name, state, cmd, cmdContext) .asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve? @@ -221,7 +215,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ entityId, 0, // FIXME remove commandId command.name, - s"No command handler found for command [$name] on ${router.entity.getClass}") + s"No command handler found for command [$name] on ${entity.getClass}") case BadRequestException(msg) => Future.successful( new SpiEventSourcedEntity.Effect( @@ -240,8 +234,8 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ s"Unexpected failure: $error", Some(error)) } finally { - router.entity._internalSetCommandContext(Optional.empty()) - router.entity._internalClearCurrentState() + entity._internalSetCommandContext(Optional.empty()) + entity._internalClearCurrentState() cmdContext.deactivate() // Very important! span.foreach { s => @@ -259,7 +253,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ messageCodec .decodeMessage(eventEnv.payload) .asInstanceOf[AnyRef] // FIXME empty? - entityHandleEvent(state, event, eventEnv.entityId, eventEnv.sequenceNumber) + entityHandleEvent(state, event, entityId, eventEnv.sequenceNumber) } def entityHandleEvent( @@ -268,15 +262,14 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ entityId: String, sequenceNumber: Long): SpiEventSourcedEntity.State = { val eventContext = new EventContextImpl(entityId, sequenceNumber) - val router = createRouter(eventContext) // FIXME reuse router instance? - router.entity._internalSetEventContext(Optional.of(eventContext)) + entity._internalSetEventContext(Optional.of(eventContext)) try { router.handleEvent(state, event) } catch { case EventHandlerNotFound(eventClass) => - throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${router.entity.getClass}") + throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${entity.getClass}") } finally { - router.entity._internalSetEventContext(Optional.empty()) + entity._internalSetEventContext(Optional.empty()) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 65ba1896b..9bb0ca6ee 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this - val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94") + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-37f8654f-SNAPSHOT") } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned