Skip to content

Commit

Permalink
Merge pull request #51 from akka/consumer-spi-sdk
Browse files Browse the repository at this point in the history
chore: Consumer spi + ESE changes
  • Loading branch information
patriknw authored Dec 3, 2024
2 parents a374872 + 108cb0f commit 10de34d
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 114 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-2909c94</kalix-runtime.version>
<kalix-runtime.version>1.3.1-a47bb2b</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[akka] final class EventSourcedResultImpl[R, S, E](

private def secondaryEffectName: String = secondaryEffect match {
case _: MessageReplyImpl[_] => "reply"
case _: ErrorReplyImpl[_] => "error"
case _: ErrorReplyImpl => "error"
case NoSecondaryEffectImpl => "no effect" // this should never happen
}

Expand All @@ -73,16 +73,16 @@ private[akka] final class EventSourcedResultImpl[R, S, E](
case _ => throw new IllegalStateException(s"The effect was not a reply but [$secondaryEffectName]")
}

override def isError: Boolean = secondaryEffect.isInstanceOf[ErrorReplyImpl[_]]
override def isError: Boolean = secondaryEffect.isInstanceOf[ErrorReplyImpl]

override def getError: String = secondaryEffect match {
case ErrorReplyImpl(description, _) => description
case ErrorReplyImpl(description) => description
case _ => throw new IllegalStateException(s"The effect was not an error but [$secondaryEffectName]")
}

override def getErrorStatusCode: Status.Code = secondaryEffect match {
case ErrorReplyImpl(_, status) => status.getOrElse(Status.Code.UNKNOWN)
case _ => throw new IllegalStateException(s"The effect was not an error but [$secondaryEffectName]")
case ErrorReplyImpl(_) => Status.Code.INVALID_ARGUMENT
case _ => throw new IllegalStateException(s"The effect was not an error but [$secondaryEffectName]")
}

override def getUpdatedState: AnyRef = state.asInstanceOf[AnyRef]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[akka] final class KeyValueEntityResultImpl[R](effect: KeyValueEntityEffe

private def secondaryEffectName: String = effect.secondaryEffect match {
case _: MessageReplyImpl[_] => "reply"
case _: ErrorReplyImpl[_] => "error"
case _: ErrorReplyImpl => "error"
case NoSecondaryEffectImpl => "no effect" // this should never happen
}

Expand All @@ -33,10 +33,10 @@ private[akka] final class KeyValueEntityResultImpl[R](effect: KeyValueEntityEffe
case _ => throw new IllegalStateException(s"The effect was not a reply but [$secondaryEffectName]")
}

override def isError(): Boolean = effect.secondaryEffect.isInstanceOf[ErrorReplyImpl[_]]
override def isError(): Boolean = effect.secondaryEffect.isInstanceOf[ErrorReplyImpl]

override def getError(): String = effect.secondaryEffect match {
case error: ErrorReplyImpl[_] => error.description
case error: ErrorReplyImpl => error.description
case _ => throw new IllegalStateException(s"The effect was not an error but [$secondaryEffectName]")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ public void verifyMultiTableViewForUserCounters() {
}

@Test
@Disabled //TODO revert once we deal with metadata translation
public void verifyActionWithMetadata() {

String metadataValue = "action-value";
Expand Down
26 changes: 26 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import akka.javasdk.CloudEvent
import akka.javasdk.Metadata
import akka.javasdk.impl.telemetry.Telemetry
import akka.javasdk.impl.telemetry.Telemetry.metadataGetter
import akka.runtime.sdk.spi.SpiMetadata
import akka.runtime.sdk.spi.SpiMetadataEntry
import com.google.protobuf.ByteString
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.component
import kalix.protocol.component.MetadataEntry
import kalix.protocol.component.MetadataEntry.Value

/**
* INTERNAL API
Expand Down Expand Up @@ -259,6 +262,24 @@ object MetadataImpl {
throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send")
}

def toSpi(metadata: Option[Metadata]): SpiMetadata = {
metadata match {
case Some(impl: MetadataImpl) if impl.entries.nonEmpty =>
val entries = impl.entries.map(entry =>
entry.value match {
case Value.Empty => new SpiMetadataEntry(entry.key, "")
case Value.StringValue(value) => new SpiMetadataEntry(entry.key, value)
case Value.BytesValue(value) =>
new SpiMetadataEntry(entry.key, value.toStringUtf8) //FIXME support bytes values or not
})
new SpiMetadata(entries)
case Some(_: MetadataImpl) => SpiMetadata.Empty
case None => SpiMetadata.Empty
case other =>
throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send")
}
}

def of(entries: Seq[MetadataEntry]): MetadataImpl = {
val transformedEntries =
entries.map { entry =>
Expand All @@ -273,4 +294,9 @@ object MetadataImpl {
new MetadataImpl(transformedEntries)
}

def of(metadata: SpiMetadata): MetadataImpl = {
val entries = metadata.entries.map(e => MetadataEntry(e.key, MetadataEntry.Value.StringValue(e.value)))
new MetadataImpl(entries)
}

}
70 changes: 49 additions & 21 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.javasdk.impl

import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
Expand Down Expand Up @@ -37,7 +38,6 @@ import akka.javasdk.impl.Sdk.StartupContext
import akka.javasdk.impl.Validations.Invalid
import akka.javasdk.impl.Validations.Valid
import akka.javasdk.impl.Validations.Validation
import akka.javasdk.impl.action.ActionsImpl
import akka.javasdk.impl.client.ComponentClientImpl
import akka.javasdk.impl.consumer.ConsumerService
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntitiesImpl
Expand Down Expand Up @@ -83,7 +83,6 @@ import com.typesafe.config.ConfigFactory
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.action.Actions
import kalix.protocol.discovery.Discovery
import kalix.protocol.event_sourced_entity.EventSourcedEntities
import kalix.protocol.value_entity.ValueEntities
Expand All @@ -93,9 +92,12 @@ import org.slf4j.LoggerFactory
import scala.jdk.OptionConverters.RichOptional
import scala.jdk.CollectionConverters._

import akka.javasdk.impl.consumer.ConsumerImpl
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.runtime.sdk.spi.ConsumerDescriptor
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.runtime.sdk.spi.TimedActionDescriptor

/**
Expand Down Expand Up @@ -355,26 +357,46 @@ private final class Sdk(
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

// command handlers candidate must have 0 or 1 parameter and return the components effect type
// we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka
def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = {
effectType.runtimeClass.isAssignableFrom(method.getReturnType) &&
method.getParameterTypes.length <= 1 &&
// Workflow will have lambdas returning Effect, we want to filter them out
!method.getName.startsWith("lambda$")
}

private val eventSourcedEntityDescriptors =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val entitySpi =

val readOnlyCommandNames =
clz.getDeclaredMethods.collect {
case method
if isCommandHandlerCandidate[EventSourcedEntity.Effect[_]](method) && method.getReturnType == classOf[
EventSourcedEntity.ReadOnlyEffect[_]] =>
method.getName
}.toSet

val instanceFactory: SpiEventSourcedEntity.FactoryContext => SpiEventSourcedEntity = { factoryContext =>
new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]](
sdkSettings,
sdkTracerFactory,
componentId,
clz,
factoryContext.entityId,
messageCodec,
context =>
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
new EventSourcedEntityDescriptor(componentId, entitySpi)
}
new EventSourcedEntityDescriptor(componentId, readOnlyCommandNames, instanceFactory)
}

private val timedActionDescriptors =
Expand All @@ -396,6 +418,26 @@ private final class Sdk(
new TimedActionDescriptor(componentId, timedActionSpi)
}

private val consumerDescriptors =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[Consumer].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val consumerClass = clz.asInstanceOf[Class[Consumer]]
val timedActionSpi =
new ConsumerImpl[Consumer](
() => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)),
consumerClass,
system.classicSystem,
runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory,
messageCodec,
ComponentDescriptorFactory.findIgnore(consumerClass))
new ConsumerDescriptor(componentId, timedActionSpi)
}

// these are available for injecting in all kinds of component that are primarily
// for side effects
// Note: config is also always available through the combination with user DI way down below
Expand All @@ -410,7 +452,6 @@ private final class Sdk(
// FIXME mixing runtime config with sdk with user project config is tricky
def spiEndpoints: SpiComponents = {

var actionsEndpoint: Option[Actions] = None
var eventSourcedEntitiesEndpoint: Option[EventSourcedEntities] = None
var valueEntitiesEndpoint: Option[ValueEntities] = None
var viewsEndpoint: Option[Views] = None
Expand All @@ -422,21 +463,6 @@ private final class Sdk(
serviceDescriptor.getFullName -> service
}

val actionAndConsumerServices = services.filter { case (_, service) =>
/*FIXME service.getClass == classOf[TimedActionService[_]] ||*/
service.getClass == classOf[ConsumerService[_]]
}

if (actionAndConsumerServices.nonEmpty) {
actionsEndpoint = Some(
new ActionsImpl(
classicSystem,
actionAndConsumerServices,
runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory))
}

services.groupBy(_._2.getClass).foreach {

case (serviceClass, eventSourcedServices: Map[String, EventSourcedEntityService[_, _, _]] @unchecked)
Expand Down Expand Up @@ -531,7 +557,6 @@ private final class Sdk(
}

override def discovery: Discovery = discoveryEndpoint
override def actions: Option[Actions] = actionsEndpoint
override def eventSourcedEntities: Option[EventSourcedEntities] = eventSourcedEntitiesEndpoint
override def eventSourcedEntityDescriptors: Seq[EventSourcedEntityDescriptor] =
Sdk.this.eventSourcedEntityDescriptors
Expand All @@ -543,6 +568,9 @@ private final class Sdk(

override def timedActionsDescriptors: Seq[TimedActionDescriptor] =
Sdk.this.timedActionDescriptors

override def consumersDescriptors: Seq[ConsumerDescriptor] =
Sdk.this.consumerDescriptors
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private[akka] final class ActionsImpl(
serviceName: String): CommandContext = {
val metadata = MetadataImpl.of(in.metadata.map(_.entries.toVector).getOrElse(Nil))
val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)
new CommandContextImpl(updatedMetadata, messageCodec, system, timerClient, tracerFactory, span)
new CommandContextImpl(updatedMetadata, messageCodec, timerClient, tracerFactory, span)
}

private def createConsumerMessageContext(
Expand Down Expand Up @@ -265,7 +265,6 @@ case class CommandEnvelopeImpl[T](payload: T, metadata: Metadata) extends Comman
class CommandContextImpl(
override val metadata: Metadata,
val messageCodec: MessageCodec,
val system: ActorSystem,
timerClient: TimerClient,
tracerFactory: () => Tracer,
span: Option[Span])
Expand Down
Loading

0 comments on commit 10de34d

Please sign in to comment.