Skip to content

Commit

Permalink
chore: Use spi instance factory for ese
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 2, 2024
1 parent a374872 commit 2c30b64
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-2909c94</kalix-runtime.version>
<kalix-runtime.version>1.3.0-2909c94-1-37f8654f-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
7 changes: 5 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ import scala.jdk.CollectionConverters._
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.runtime.sdk.spi.TimedActionDescriptor

/**
Expand Down Expand Up @@ -361,20 +362,22 @@ private final class Sdk(
.collect {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val entitySpi =
val instanceFactory: SpiEventSourcedEntity.FactoryContext => SpiEventSourcedEntity = { factoryContext =>
new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]](
sdkSettings,
sdkTracerFactory,
componentId,
clz,
factoryContext.entityId,
messageCodec,
context =>
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
new EventSourcedEntityDescriptor(componentId, entitySpi)
}
new EventSourcedEntityDescriptor(componentId, instanceFactory)
}

private val timedActionDescriptors =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
tracerFactory: () => Tracer,
componentId: String,
componentClass: Class[_],
entityId: String,
messageCodec: JsonMessageCodec,
factory: EventSourcedEntityContext => ES,
snapshotEvery: Int)
Expand All @@ -102,29 +103,24 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
private val componentDescriptor = ComponentDescriptor.descriptorFor(componentClass, messageCodec)

// FIXME remove EventSourcedEntityRouter altogether, and only keep stateless ReflectiveEventSourcedEntityRouter
private def createRouter(context: EventSourcedEntityContext)
: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] =
private val router: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = {
val context = new EventSourcedEntityContextImpl(entityId)
new ReflectiveEventSourcedEntityRouter[S, E, ES](
factory(context),
componentDescriptor.commandHandlers,
messageCodec)
.asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]]

override def emptyState: SpiEventSourcedEntity.State = {
// FIXME rather messy with the contexts here
val context = new EventSourcedEntityContextImpl("FIXME_ID")
val router = createRouter(context)
try {
router.entity.emptyState()
} finally {
router.entity._internalSetCommandContext(Optional.empty())
}
}

private def entity: EventSourcedEntity[AnyRef, AnyRef] =
router.entity

override def emptyState: SpiEventSourcedEntity.State =
entity.emptyState()

override def handleCommand(
state: SpiEventSourcedEntity.State,
command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = {
val entityId = command.entityId

val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command)
span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
Expand All @@ -146,11 +142,9 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
span,
tracerFactory)

val context = new EventSourcedEntityContextImpl(entityId)
val router = createRouter(context)
router.entity._internalSetCommandContext(Optional.of(cmdContext))
entity._internalSetCommandContext(Optional.of(cmdContext))
try {
router.entity._internalSetCurrentState(state)
entity._internalSetCurrentState(state)
val commandEffect = router
.handleCommand(command.name, state, cmd, cmdContext)
.asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve?
Expand Down Expand Up @@ -221,7 +215,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
entityId,
0, // FIXME remove commandId
command.name,
s"No command handler found for command [$name] on ${router.entity.getClass}")
s"No command handler found for command [$name] on ${entity.getClass}")
case BadRequestException(msg) =>
Future.successful(
new SpiEventSourcedEntity.Effect(
Expand All @@ -240,8 +234,8 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
s"Unexpected failure: $error",
Some(error))
} finally {
router.entity._internalSetCommandContext(Optional.empty())
router.entity._internalClearCurrentState()
entity._internalSetCommandContext(Optional.empty())
entity._internalClearCurrentState()
cmdContext.deactivate() // Very important!

span.foreach { s =>
Expand All @@ -259,7 +253,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
messageCodec
.decodeMessage(eventEnv.payload)
.asInstanceOf[AnyRef] // FIXME empty?
entityHandleEvent(state, event, eventEnv.entityId, eventEnv.sequenceNumber)
entityHandleEvent(state, event, entityId, eventEnv.sequenceNumber)
}

def entityHandleEvent(
Expand All @@ -268,15 +262,14 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
entityId: String,
sequenceNumber: Long): SpiEventSourcedEntity.State = {
val eventContext = new EventContextImpl(entityId, sequenceNumber)
val router = createRouter(eventContext) // FIXME reuse router instance?
router.entity._internalSetEventContext(Optional.of(eventContext))
entity._internalSetEventContext(Optional.of(eventContext))
try {
router.handleEvent(state, event)
} catch {
case EventHandlerNotFound(eventClass) =>
throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${router.entity.getClass}")
throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${entity.getClass}")
} finally {
router.entity._internalSetEventContext(Optional.empty())
entity._internalSetEventContext(Optional.empty())
}
}

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-2909c94-1-37f8654f-SNAPSHOT")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down

0 comments on commit 2c30b64

Please sign in to comment.