diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java index dc21d3f57..4ed7888a5 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java @@ -8,6 +8,7 @@ import akka.javasdk.testkit.TestKit; import akka.javasdk.testkit.TestKitSupport; import akkajavasdk.components.eventsourcedentities.counter.Counter; +import akkajavasdk.components.eventsourcedentities.counter.CounterCommand; import akkajavasdk.components.eventsourcedentities.counter.CounterEntity; import akka.javasdk.client.EventSourcedEntityClient; import akkajavasdk.components.eventsourcedentities.hierarchy.AbstractTextConsumer; @@ -156,6 +157,24 @@ public void verifyCounterEventSourcedAfterRestartFromSnapshot() { new IsEqual(10)); } + @Test + public void verifyRequestWithSealedCommand() { + var client = componentClient.forEventSourcedEntity("counter-with-sealed-command-handler"); + await(client + .method(CounterEntity::handle) + .invokeAsync(new CounterCommand.Set(123))); + + Integer result1 = await(client.method(CounterEntity::get).invokeAsync()); + assertThat(result1).isEqualTo(123); + + await(client + .method(CounterEntity::handle) + .invokeAsync(new CounterCommand.Increase(123))); + + Integer result2 = await(client.method(CounterEntity::get).invokeAsync()); + assertThat(result2).isEqualTo(246); + } + @Test public void verifyRequestWithDefaultProtoValuesWithEntity() { var client = componentClient.forEventSourcedEntity("some-counter"); diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterCommand.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterCommand.java new file mode 100644 index 000000000..aa68fe4da --- /dev/null +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterCommand.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akkajavasdk.components.eventsourcedentities.counter; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CounterCommand.Increase.class, name = "I"), + @JsonSubTypes.Type(value = CounterCommand.Set.class, name = "S")}) +public sealed interface CounterCommand { + + record Increase(int value) implements CounterCommand { + } + + record Set(int value) implements CounterCommand { + } +} diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java index 345e9d343..bf150b59b 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/components/eventsourcedentities/counter/CounterEntity.java @@ -72,6 +72,16 @@ public Effect set(Integer value) { return effects().persist(new CounterEvent.ValueSet(value)).thenReply(Counter::value); } + public Effect handle(CounterCommand counterCommand) { + return switch (counterCommand){ + case CounterCommand.Increase(var value) -> + effects().persist(new CounterEvent.ValueIncreased(value)).thenReply(Counter::value); + + case CounterCommand.Set(var value) -> + effects().persist(new CounterEvent.ValueSet(value)).thenReply(Counter::value); + }; + } + public Effect multiIncrease(List increase) { return effects().persistAll(increase.stream().map(CounterEvent.ValueIncreased::new).toList()) .thenReply(Counter::value); diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/CommandHandler.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/CommandHandler.scala deleted file mode 100644 index 05e411631..000000000 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/CommandHandler.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk.impl - -import akka.annotation.InternalApi -import akka.javasdk.impl.reflection.ParameterExtractor -import java.lang.reflect.InvocationTargetException -import java.lang.reflect.Method - -import scala.util.control.Exception.Catcher - -import akka.javasdk.impl.serialization.JsonSerializer -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.google.protobuf.Descriptors - -/** - * INTERNAL API - */ -@InternalApi -private[impl] final case class CommandHandler( - methodName: String, - serializer: JsonSerializer, - requestMessageDescriptor: Descriptors.Descriptor, - methodInvokers: Map[String, MethodInvoker]) { - - /** - * This method will look up for a registered method that receives a super type of the incoming payload. It's only - * called when a direct method is not found. - * - * The incoming typeUrl is for one of the existing subtypes, but the method itself is defined to receive a super type. - * Therefore, we look up the method parameter to find out if one of its subtypes matches the incoming typeUrl. - */ - private def lookupMethodAcceptingSubType(inputTypeUrl: String): Option[MethodInvoker] = { - methodInvokers.values.find { javaMethod => - //None could happen if the method is a delete handler - val lastParam = javaMethod.method.getParameterTypes.lastOption - if (lastParam.exists(_.getAnnotation(classOf[JsonSubTypes]) != null)) { - lastParam.get.getAnnotation(classOf[JsonSubTypes]).value().exists { subType => - inputTypeUrl == serializer - .contentTypeFor(subType.value()) //TODO requires more changes to be used with JsonMigration - } - } else false - } - } - - def isSingleNameInvoker: Boolean = methodInvokers.size == 1 - - def lookupInvoker(inputTypeUrl: String): Option[MethodInvoker] = - methodInvokers - .get(serializer.removeVersion(inputTypeUrl)) - .orElse(lookupMethodAcceptingSubType(inputTypeUrl)) - - def getInvoker(inputTypeUrl: String): MethodInvoker = - lookupInvoker(inputTypeUrl).getOrElse { - throw new NoSuchElementException( - s"Couldn't find any entry for typeUrl [$inputTypeUrl] in [${methodInvokers.view.mapValues(_.method.getName).mkString}].") - } - - // for embedded SDK we expect components to be either zero or one arity - def getSingleNameInvoker(): MethodInvoker = - if (methodInvokers.size != 1) throw new IllegalStateException(s"More than one method defined for $methodName") - else methodInvokers.head._2 -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] object MethodInvoker { - - def apply(javaMethod: Method, parameterExtractor: ParameterExtractor[InvocationContext, AnyRef]): MethodInvoker = - MethodInvoker(javaMethod, Array(parameterExtractor)) - - def apply(javaMethod: Method): MethodInvoker = - MethodInvoker(javaMethod, Array.empty[ParameterExtractor[InvocationContext, AnyRef]]) - -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] final case class MethodInvoker( - method: Method, - parameterExtractors: Array[ParameterExtractor[InvocationContext, AnyRef]]) { - - /** - * To invoke methods with parameters an InvocationContext is necessary extract them from the message. - */ - def invoke(componentInstance: AnyRef, invocationContext: InvocationContext): AnyRef = { - try method.invoke(componentInstance, parameterExtractors.map(e => e.extract(invocationContext)): _*) - catch unwrapInvocationTargetException() - } - - /** - * To invoke methods with arity zero. - */ - def invoke(componentInstance: AnyRef): AnyRef = { - try method.invoke(componentInstance) - catch unwrapInvocationTargetException() - } - - /** - * To invoke a methods with a deserialized payload - */ - def invokeDirectly(componentInstance: AnyRef, payload: AnyRef): AnyRef = { - try method.invoke(componentInstance, payload) - catch unwrapInvocationTargetException() - } - - private def unwrapInvocationTargetException(): Catcher[AnyRef] = { - case exc: InvocationTargetException if exc.getCause != null => - throw exc.getCause - } - -} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/ComponentDescriptor.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/ComponentDescriptor.scala index bbda696ca..9465d5b59 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/ComponentDescriptor.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/ComponentDescriptor.scala @@ -5,7 +5,6 @@ package akka.javasdk.impl import akka.annotation.InternalApi -import akka.javasdk.impl.reflection.KalixMethod import akka.javasdk.impl.serialization.JsonSerializer /** @@ -20,21 +19,9 @@ private[impl] object ComponentDescriptor { def descriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor = ComponentDescriptorFactory.getFactoryFor(component).buildDescriptorFor(component, serializer) - def apply(serializer: JsonSerializer, kalixMethods: Seq[KalixMethod]): ComponentDescriptor = { - - //TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all - val methods: Map[String, CommandHandler] = - kalixMethods.map { method => - (method.serviceMethod.methodName.capitalize, method.toCommandHandler(serializer)) - }.toMap - - new ComponentDescriptor(methods) - - } - - def apply(methods: Map[String, CommandHandler]): ComponentDescriptor = { + def apply(methods: Map[String, MethodInvoker]): ComponentDescriptor = { new ComponentDescriptor(methods) } } -private[akka] final case class ComponentDescriptor private (commandHandlers: Map[String, CommandHandler]) +private[akka] final case class ComponentDescriptor private (methodInvokers: Map[String, MethodInvoker]) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/ConsumerDescriptorFactory.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/ConsumerDescriptorFactory.scala index 0e732cd44..19ecbe9bc 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/ConsumerDescriptorFactory.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/ConsumerDescriptorFactory.scala @@ -5,11 +5,9 @@ package akka.javasdk.impl import akka.annotation.InternalApi +import akka.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl import akka.javasdk.impl.ComponentDescriptorFactory._ -import akka.javasdk.impl.reflection.HandleDeletesServiceMethod -import akka.javasdk.impl.reflection.KalixMethod import akka.javasdk.impl.reflection.Reflect -import akka.javasdk.impl.reflection.SubscriptionServiceMethod import akka.javasdk.impl.serialization.JsonSerializer /** @@ -22,35 +20,42 @@ private[impl] object ConsumerDescriptorFactory extends ComponentDescriptorFactor import Reflect.methodOrdering - val handleDeletesMethods = component.getMethods + val handleDeletesMethods: Map[String, MethodInvoker] = component.getMethods .filter(hasConsumerOutput) .filter(hasHandleDeletes) .sorted .map { method => - KalixMethod(HandleDeletesServiceMethod(method)) + ProtobufEmptyTypeUrl -> MethodInvoker(method) } - .toSeq + .toMap - val methods = component.getMethods + val methods: Map[String, MethodInvoker] = component.getMethods .filter(hasConsumerOutput) .filterNot(hasHandleDeletes) - .map { method => - KalixMethod(SubscriptionServiceMethod(method)) + .flatMap { method => + method.getParameterTypes.headOption match { + case Some(inputType) => + val invoker = MethodInvoker(method) + if (method.getParameterTypes.last.isSealed) { + method.getParameterTypes.last.getPermittedSubclasses.toList + .flatMap(subClass => { + serializer.contentTypesFor(subClass).map(typeUrl => typeUrl -> invoker) + }) + } else { + val typeUrls = serializer.contentTypesFor(inputType) + typeUrls.map(_ -> invoker) + } + case None => + // FIXME check if there is a validation for that already + throw new IllegalStateException( + "Consumer method must have at least one parameter, unless it is a delete handler") + } } - .toIndexedSeq - - val allMethods = methods ++ handleDeletesMethods - - val commandHandlers = allMethods.map { method => - method.toCommandHandler(serializer) - } + .toMap - //folding all invokers into a single map - val allInvokers = commandHandlers.foldLeft(Map.empty[String, MethodInvoker]) { (acc, handler) => - acc ++ handler.methodInvokers - } + val allInvokers = methods ++ handleDeletesMethods //Empty command/method name, because it is not used in the consumer, we just need the invokers - ComponentDescriptor(Map("" -> CommandHandler(null, serializer, null, allInvokers))) + ComponentDescriptor(allInvokers) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityDescriptorFactory.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityDescriptorFactory.scala index d0e170a83..f189877dd 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityDescriptorFactory.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/EntityDescriptorFactory.scala @@ -4,14 +4,9 @@ package akka.javasdk.impl -import java.lang.reflect.Method - -import scala.reflect.ClassTag - import akka.annotation.InternalApi import akka.javasdk.eventsourcedentity.EventSourcedEntity -import akka.javasdk.impl.reflection.ActionHandlerMethod -import akka.javasdk.impl.reflection.KalixMethod +import akka.javasdk.impl.reflection.Reflect.isCommandHandlerCandidate import akka.javasdk.impl.serialization.JsonSerializer import akka.javasdk.keyvalueentity.KeyValueEntity import akka.javasdk.workflow.Workflow @@ -23,34 +18,22 @@ import akka.javasdk.workflow.Workflow private[impl] object EntityDescriptorFactory extends ComponentDescriptorFactory { override def buildDescriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor = { - - // command handlers candidate must have 0 or 1 parameter and return the components effect type - // we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka - def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = { - effectType.runtimeClass.isAssignableFrom(method.getReturnType) && - method.getParameterTypes.length <= 1 && - // Workflow will have lambdas returning Effect, we want to filter them out - !method.getName.startsWith("lambda$") - } - - val commandHandlerMethods: Seq[KalixMethod] = if (classOf[EventSourcedEntity[_, _]].isAssignableFrom(component)) { + //TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all + val commandHandlerMethods = if (classOf[EventSourcedEntity[_, _]].isAssignableFrom(component)) { component.getDeclaredMethods.collect { case method if isCommandHandlerCandidate[EventSourcedEntity.Effect[_]](method) => - val servMethod = ActionHandlerMethod(component, method) - KalixMethod(servMethod, entityIds = Seq.empty) - }.toSeq + method.getName.capitalize -> MethodInvoker(method) + } } else if (classOf[KeyValueEntity[_]].isAssignableFrom(component)) { component.getDeclaredMethods.collect { case method if isCommandHandlerCandidate[KeyValueEntity.Effect[_]](method) => - val servMethod = ActionHandlerMethod(component, method) - KalixMethod(servMethod, entityIds = Seq.empty) - }.toSeq + method.getName.capitalize -> MethodInvoker(method) + } } else if (classOf[Workflow[_]].isAssignableFrom(component)) { component.getDeclaredMethods.collect { case method if isCommandHandlerCandidate[Workflow.Effect[_]](method) => - val servMethod = ActionHandlerMethod(component, method) - KalixMethod(servMethod, entityIds = Seq.empty) - }.toSeq + method.getName.capitalize -> MethodInvoker(method) + } } else { // should never happen @@ -58,6 +41,6 @@ private[impl] object EntityDescriptorFactory extends ComponentDescriptorFactory s"Unsupported component type: ${component.getName}. Supported types are: EventSourcedEntity, ValueEntity, Workflow") } - ComponentDescriptor(serializer, commandHandlerMethods) + ComponentDescriptor(commandHandlerMethods.toMap) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/InvocationContext.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/InvocationContext.scala deleted file mode 100644 index fa1035632..000000000 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/InvocationContext.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk.impl - -import akka.javasdk.impl.reflection.DynamicMessageContext -import akka.javasdk.impl.reflection.MetadataContext -import com.google.protobuf.Descriptors -import com.google.protobuf.DynamicMessage -import com.google.protobuf.any.{ Any => ScalaPbAny } -import AnySupport.BytesPrimitive -import akka.annotation.InternalApi -import akka.javasdk.Metadata -import akka.javasdk.impl.reflection.ParameterExtractors.toAny - -/** - * INTERNAL API - */ -@InternalApi -private[javasdk] object InvocationContext { // FIXME: refactor this to BytesPayload - - private val typeUrlField = ScalaPbAny.javaDescriptor.findFieldByName("type_url") - private val valueField = ScalaPbAny.javaDescriptor.findFieldByName("value") - - def apply( - anyMessage: ScalaPbAny, - methodDescriptor: Descriptors.Descriptor, - metadata: Metadata = Metadata.EMPTY): InvocationContext = { - - val dynamicMessage = - if (AnySupport.isJson(anyMessage) || - anyMessage.typeUrl == BytesPrimitive.fullName) { - // FIXME how can this ever work unless methodDescriptor is protobuf Any, or a synthetic - // message with exactly the two fields type_url and value? - DynamicMessage - .newBuilder(methodDescriptor) - .setField(typeUrlField, anyMessage.typeUrl) - .setField(valueField, anyMessage.value) - .build() - - } else { - DynamicMessage.parseFrom(methodDescriptor, anyMessage.value) - } - - new InvocationContext(dynamicMessage, metadata) - } -} -class InvocationContext(val message: DynamicMessage, val metadata: Metadata) - extends DynamicMessageContext - with MetadataContext { - - override def hasField(field: Descriptors.FieldDescriptor): Boolean = - message.hasField(field) - - override def getField(field: Descriptors.FieldDescriptor): AnyRef = - message.getField(field) - - override def getAny: ScalaPbAny = ScalaPbAny.fromJavaProto(toAny(message)) -} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/MethodInvoker.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/MethodInvoker.scala new file mode 100644 index 000000000..67642ddd7 --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/MethodInvoker.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl + +import akka.annotation.InternalApi +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method + +import scala.util.control.Exception.Catcher + +/** + * INTERNAL API + */ +@InternalApi +private[impl] final case class MethodInvoker(method: Method) { + + /** + * To invoke methods with arity zero. + */ + def invoke(componentInstance: AnyRef): AnyRef = { + try method.invoke(componentInstance) + catch unwrapInvocationTargetException() + } + + /** + * To invoke a methods with a deserialized payload + */ + def invokeDirectly(componentInstance: AnyRef, payload: AnyRef): AnyRef = { + try method.invoke(componentInstance, payload) + catch unwrapInvocationTargetException() + } + + private def unwrapInvocationTargetException(): Catcher[AnyRef] = { + case exc: InvocationTargetException if exc.getCause != null => + throw exc.getCause + } +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/ResolvedServiceMethod.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/ResolvedServiceMethod.scala index 19b94c790..a91e0360c 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/ResolvedServiceMethod.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/ResolvedServiceMethod.scala @@ -6,27 +6,9 @@ package akka.javasdk.impl import akka.annotation.InternalApi import com.google.protobuf.ByteString -import com.google.protobuf.Descriptors import com.google.protobuf.Parser import com.google.protobuf.{ Message => JavaMessage } -/** - * A resolved service method. - * - * INTERNAL API - */ -@InternalApi -final case class ResolvedServiceMethod[I, O]( - descriptor: Descriptors.MethodDescriptor, - inputType: ResolvedType[I], - outputType: ResolvedType[O]) { - - def outputStreamed: Boolean = descriptor.isServerStreaming - def name: String = descriptor.getName - - def method(): Descriptors.MethodDescriptor = descriptor -} - /** * A resolved type * diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 06a49a5f2..6e88c4a63 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -415,6 +415,9 @@ private final class Sdk( private var timedActionDescriptors = Vector.empty[TimedActionDescriptor] private var consumerDescriptors = Vector.empty[ConsumerDescriptor] + val s = componentClasses + .filter(hasComponentId) + componentClasses .filter(hasComponentId) .foreach { @@ -515,7 +518,7 @@ private final class Sdk( case clz if classOf[Consumer].isAssignableFrom(clz) => val componentId = clz.getAnnotation(classOf[ComponentId]).value val consumerClass = clz.asInstanceOf[Class[Consumer]] - val timedActionSpi = + val consumerSpi = new ConsumerImpl[Consumer]( componentId, () => wiredInstance(consumerClass)(sideEffectingComponentInjects(None)), @@ -533,7 +536,7 @@ private final class Sdk( clz.getName, consumerSource(consumerClass), consumerDestination(consumerClass), - timedActionSpi) + consumerSpi) case clz if Reflect.isRestEndpoint(clz) => // handled separately because ComponentId is not mandatory diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/TimedActionDescriptorFactory.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/TimedActionDescriptorFactory.scala index 540a07885..f861e319a 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/TimedActionDescriptorFactory.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/TimedActionDescriptorFactory.scala @@ -5,10 +5,9 @@ package akka.javasdk.impl import akka.annotation.InternalApi -import akka.javasdk.impl.ComponentDescriptorFactory.hasTimedActionEffectOutput -import akka.javasdk.impl.reflection.ActionHandlerMethod -import akka.javasdk.impl.reflection.KalixMethod +import akka.javasdk.impl.reflection.Reflect.isCommandHandlerCandidate import akka.javasdk.impl.serialization.JsonSerializer +import akka.javasdk.timedaction.TimedAction /** * INTERNAL API @@ -17,15 +16,12 @@ import akka.javasdk.impl.serialization.JsonSerializer private[impl] object TimedActionDescriptorFactory extends ComponentDescriptorFactory { override def buildDescriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor = { + //TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all + val invokers = component.getDeclaredMethods.collect { + case method if isCommandHandlerCandidate[TimedAction.Effect](method) => + method.getName.capitalize -> MethodInvoker(method) + }.toMap - val commandHandlerMethods = component.getDeclaredMethods - .filter(hasTimedActionEffectOutput) - .map { method => - val servMethod = ActionHandlerMethod(component, method) - KalixMethod(servMethod, entityIds = Seq.empty) - } - .toIndexedSeq - - ComponentDescriptor(serializer, commandHandlerMethods) + ComponentDescriptor(invokers) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala index 94bd9efff..7de12d79d 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala @@ -64,11 +64,7 @@ private[impl] final class ConsumerImpl[C <: Consumer]( private val traceInstrumentation = new TraceInstrumentation(componentId, ConsumerCategory, tracerFactory) private def createRouter(): ReflectiveConsumerRouter[C] = - new ReflectiveConsumerRouter[C]( - factory(), - componentDescriptor.commandHandlers.values.head.methodInvokers, - serializer, - ignoreUnknown) + new ReflectiveConsumerRouter[C](factory(), componentDescriptor.methodInvokers, serializer, ignoreUnknown) override def handleMessage(message: Message): Future[Effect] = { val metadata = MetadataImpl.of(message.metadata) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala index 806a62be7..b64082da3 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -94,7 +94,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ private val router: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = { val context = new EventSourcedEntityContextImpl(entityId) - new ReflectiveEventSourcedEntityRouter[S, E, ES](factory(context), componentDescriptor.commandHandlers, serializer) + new ReflectiveEventSourcedEntityRouter[S, E, ES](factory(context), componentDescriptor.methodInvokers, serializer) .asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]] } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala index 0a7a4db29..c8f49573f 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala @@ -6,7 +6,7 @@ package akka.javasdk.impl.eventsourcedentity import akka.annotation.InternalApi import akka.javasdk.eventsourcedentity.EventSourcedEntity -import akka.javasdk.impl.CommandHandler +import akka.javasdk.impl.MethodInvoker import akka.javasdk.impl.CommandSerialization import akka.javasdk.impl.HandlerNotFoundException import akka.javasdk.impl.serialization.JsonSerializer @@ -18,24 +18,23 @@ import akka.runtime.sdk.spi.BytesPayload @InternalApi private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( val entity: ES, - commandHandlers: Map[String, CommandHandler], + methodInvokers: Map[String, MethodInvoker], serializer: JsonSerializer) { - private def commandHandlerLookup(commandName: String): CommandHandler = - commandHandlers.get(commandName) match { + private def methodInvokerLookup(commandName: String): MethodInvoker = + methodInvokers.get(commandName) match { case Some(handler) => handler case None => - throw new HandlerNotFoundException("command", commandName, entity.getClass, commandHandlers.keySet) + throw new HandlerNotFoundException("command", commandName, entity.getClass, methodInvokers.keySet) } def handleCommand(commandName: String, command: BytesPayload): EventSourcedEntity.Effect[_] = { - val commandHandler = commandHandlerLookup(commandName) + val methodInvoker = methodInvokerLookup(commandName) if (serializer.isJson(command) || command.isEmpty) { // - BytesPayload.empty - there is no real command, and we are calling a method with arity 0 // - BytesPayload with json - we deserialize it and call the method - val methodInvoker = commandHandler.getSingleNameInvoker() val deserializedCommand = CommandSerialization.deserializeComponentClientCommand(methodInvoker.method, command, serializer) val result = deserializedCommand match { @@ -45,8 +44,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE result.asInstanceOf[EventSourcedEntity.Effect[_]] } else { throw new IllegalStateException( - s"Could not find a matching command handler for method [$commandName], content type " + - s"[${command.contentType}], invokers keys [${commandHandler.methodInvokers.keys.mkString(", ")}," + + s"Could not find a matching command handler for method [$commandName], content type [${command.contentType}] " + s"on [${entity.getClass.getName}]") } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala index 1d64043ea..179a746e2 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala @@ -88,7 +88,7 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]]( private val router: ReflectiveKeyValueEntityRouter[AnyRef, KeyValueEntity[AnyRef]] = { val context = new KeyValueEntityContextImpl(entityId) - new ReflectiveKeyValueEntityRouter[S, KV](factory(context), componentDescriptor.commandHandlers, serializer) + new ReflectiveKeyValueEntityRouter[S, KV](factory(context), componentDescriptor.methodInvokers, serializer) .asInstanceOf[ReflectiveKeyValueEntityRouter[AnyRef, KeyValueEntity[AnyRef]]] } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/ReflectiveKeyValueEntityRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/ReflectiveKeyValueEntityRouter.scala index 113887cb3..393727980 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/ReflectiveKeyValueEntityRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/ReflectiveKeyValueEntityRouter.scala @@ -5,7 +5,7 @@ package akka.javasdk.impl.keyvalueentity import akka.annotation.InternalApi -import akka.javasdk.impl.CommandHandler +import akka.javasdk.impl.MethodInvoker import akka.javasdk.impl.CommandSerialization import akka.javasdk.impl.HandlerNotFoundException import akka.javasdk.impl.serialization.JsonSerializer @@ -18,24 +18,23 @@ import akka.runtime.sdk.spi.BytesPayload @InternalApi private[impl] class ReflectiveKeyValueEntityRouter[S, KV <: KeyValueEntity[S]]( val entity: KV, - commandHandlers: Map[String, CommandHandler], + methodInvokers: Map[String, MethodInvoker], serializer: JsonSerializer) { - private def commandHandlerLookup(commandName: String): CommandHandler = - commandHandlers.get(commandName) match { + private def methodInvokerLookup(commandName: String): MethodInvoker = + methodInvokers.get(commandName) match { case Some(handler) => handler case None => - throw new HandlerNotFoundException("command", commandName, entity.getClass, commandHandlers.keySet) + throw new HandlerNotFoundException("command", commandName, entity.getClass, methodInvokers.keySet) } def handleCommand(commandName: String, command: BytesPayload): KeyValueEntity.Effect[_] = { - val commandHandler = commandHandlerLookup(commandName) + val methodInvoker = methodInvokerLookup(commandName) if (serializer.isJson(command) || command.isEmpty) { // - BytesPayload.empty - there is no real command, and we are calling a method with arity 0 // - BytesPayload with json - we deserialize it and call the method - val methodInvoker = commandHandler.getSingleNameInvoker() val deserializedCommand = CommandSerialization.deserializeComponentClientCommand(methodInvoker.method, command, serializer) val result = deserializedCommand match { @@ -45,8 +44,7 @@ private[impl] class ReflectiveKeyValueEntityRouter[S, KV <: KeyValueEntity[S]]( result.asInstanceOf[KeyValueEntity.Effect[_]] } else { throw new IllegalStateException( - s"Could not find a matching command handler for method [$commandName], content type " + - s"[${command.contentType}], invokers keys [${commandHandler.methodInvokers.keys.mkString(", ")}," + + s"Could not find a matching command handler for method [$commandName], content type [${command.contentType}] " + s"on [${entity.getClass.getName}]") } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/KalixMethod.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/KalixMethod.scala deleted file mode 100644 index f1c5514c3..000000000 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/KalixMethod.scala +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk.impl.reflection - -import java.lang.reflect.Method - -import scala.annotation.tailrec - -import akka.annotation.InternalApi -import akka.javasdk.impl.AclDescriptorFactory -import akka.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl -import akka.javasdk.impl.CommandHandler -import akka.javasdk.impl.MethodInvoker -import akka.javasdk.impl.serialization.JsonSerializer -import com.google.protobuf.Descriptors -import com.google.protobuf.any.{ Any => ScalaPbAny } - -/** - * INTERNAL API - */ -@InternalApi -private[impl] object ServiceMethod { - def isStreamOut(method: Method): Boolean = false - - // this is more for early validation. We don't support stream-in right now - // we block it before deploying anything - def isStreamIn(method: Method): Boolean = false -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] sealed trait ServiceMethod { - def methodName: String - def javaMethodOpt: Option[Method] - - def streamIn: Boolean - def streamOut: Boolean -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] sealed trait AnyJsonRequestServiceMethod extends ServiceMethod { - def inputType: Class[_] -} - -/** - * Build from command handler methods on actions - * - * INTERNAL API - */ -@InternalApi -private[impl] final case class ActionHandlerMethod(component: Class[_], method: Method) - extends AnyJsonRequestServiceMethod { - override def methodName: String = method.getName - override def javaMethodOpt: Option[Method] = Some(method) - val hasInputType: Boolean = method.getParameterTypes.headOption.isDefined - val inputType: Class[_] = method.getParameterTypes.headOption.getOrElse(classOf[Unit]) - val streamIn: Boolean = false - val streamOut: Boolean = false -} - -private[impl] final case class CombinedSubscriptionServiceMethod( - componentName: String, - combinedMethodName: String, - methodsMap: Map[String, Method]) - extends AnyJsonRequestServiceMethod { - - val methodName: String = combinedMethodName - override def inputType: Class[_] = classOf[ScalaPbAny] - - override def javaMethodOpt: Option[Method] = None - - val streamIn: Boolean = false - val streamOut: Boolean = false -} - -/** - * Build from methods annotated with @Consume. Those methods are not annotated with Spring REST annotations and are only - * used internally (between runtime and user function). - * - * INTERNAL API - */ -@InternalApi -private[impl] final case class SubscriptionServiceMethod(javaMethod: Method) extends AnyJsonRequestServiceMethod { - - val methodName: String = javaMethod.getName - val inputType: Class[_] = javaMethod.getParameterTypes.head - - override def javaMethodOpt: Option[Method] = Some(javaMethod) - - val streamIn: Boolean = ServiceMethod.isStreamIn(javaMethod) - val streamOut: Boolean = ServiceMethod.isStreamOut(javaMethod) -} - -/** - * Additional trait to simplify pattern matching for actual and virtual delete service method - * - * INTERNAL API - */ -@InternalApi -private[impl] trait DeleteServiceMethod extends ServiceMethod - -/** - * A special case for subscription method with arity zero, in comparison to SubscriptionServiceMethod with required - * arity one. - * - * INTERNAL API - */ -@InternalApi -private[impl] final case class HandleDeletesServiceMethod(javaMethod: Method) extends DeleteServiceMethod { - override def methodName: String = javaMethod.getName - - override def javaMethodOpt: Option[Method] = Some(javaMethod) - - override def streamIn: Boolean = false - - override def streamOut: Boolean = false -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] object KalixMethod { - def apply( - serviceMethod: ServiceMethod, - methodOptions: Option[kalix.MethodOptions] = None, - entityIds: Seq[String] = Seq.empty): KalixMethod = { - - val aclOptions = - serviceMethod.javaMethodOpt.flatMap { meth => - AclDescriptorFactory.methodLevelAclAnnotation(meth) - } - - new KalixMethod(serviceMethod, methodOptions, entityIds) - .withKalixOptions(aclOptions) - } -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] final case class KalixMethod private ( - serviceMethod: ServiceMethod, - methodOptions: Option[kalix.MethodOptions] = None, - entityIds: Seq[String] = Seq.empty) { - - /** - * KalixMethod is used to collect all the information that we need to produce a gRPC method for the runtime. At the - * end of the road, we need to check if any incompatibility was created. Therefore the validation should occur when we - * finish to scan the component and are ready to build the gRPC method. - * - * For example, a method eventing.in method with an ACL annotation. - */ - def validate(): Unit = { - // check if eventing.in and acl are mixed - methodOptions.foreach { opts => - if (opts.getEventing.hasIn && opts.hasAcl) - throw ServiceIntrospectionException( - // safe call: ServiceMethods without a java counterpart won't have ACL anyway - serviceMethod.javaMethodOpt.get, - "Subscription methods are for internal use only and cannot be combined with ACL annotations.") - } - } - - /** - * This method merges the new method options with the existing ones. In case of collision the 'opts' are kept - * - * @param opts - * @return - */ - def withKalixOptions(opts: kalix.MethodOptions): KalixMethod = - copy(methodOptions = Some(mergeKalixOptions(methodOptions, opts))) - - /** - * This method merges the new method options with the existing ones. In case of collision the 'opts' are kept - * @param opts - * @return - */ - def withKalixOptions(opts: Option[kalix.MethodOptions]): KalixMethod = - opts match { - case Some(methodOptions) => withKalixOptions(methodOptions) - case None => this - } - - private[akka] def mergeKalixOptions( - source: Option[kalix.MethodOptions], - addOn: kalix.MethodOptions): kalix.MethodOptions = { - val builder = source match { - case Some(src) => src.toBuilder - case None => kalix.MethodOptions.newBuilder() - } - builder.mergeFrom(addOn) - builder.build() - } - - def toCommandHandler(serializer: JsonSerializer): CommandHandler = { - serviceMethod match { - - case method: SubscriptionServiceMethod => - val methodInvokers = - serviceMethod.javaMethodOpt - .map { meth => - if (meth.getParameterTypes.last.isSealed) { - meth.getParameterTypes.last.getPermittedSubclasses.toList - .flatMap(subClass => { - serializer.contentTypesFor(subClass).map(typeUrl => typeUrl -> MethodInvoker(meth)) - }) - .toMap - } else { - val typeUrls = serializer.contentTypesFor(method.inputType) - typeUrls.map(_ -> MethodInvoker(meth)).toMap - } - } - .getOrElse(Map.empty) - - CommandHandler(null, serializer, null, methodInvokers) - - case _: ActionHandlerMethod => - val methodInvokers = - serviceMethod.javaMethodOpt - .map { meth => - //the key is the content type, but in the case of a timed action, it doesn't matter - Map("" -> MethodInvoker(meth)) - } - .getOrElse(Map.empty) - - CommandHandler(null, serializer, null, methodInvokers) - - case _: DeleteServiceMethod => - val methodInvokers = serviceMethod.javaMethodOpt.map { meth => - (ProtobufEmptyTypeUrl, MethodInvoker(meth)) - }.toMap - - CommandHandler(null, serializer, null, methodInvokers) - case other => - throw new IllegalStateException("Not supported method type: " + other.getClass.getName) - } - - } -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] trait ExtractorCreator { - def apply(descriptor: Descriptors.Descriptor): ParameterExtractor[DynamicMessageContext, AnyRef] -} - -/** - * Ensures all generated names in a given package are unique, noting that grpcMethod names and message names must not - * conflict. - * - * Note that it is important to make sure that invoking this is done in an deterministic order or else JVMs on different - * nodes will generate different names for the same method. Sorting can be done using ReflectionUtils.methodOrdering - * - * INTERNAL API - */ -@InternalApi -private[impl] final class NameGenerator { - private var names: Set[String] = Set.empty - - def getName(base: String): String = { - if (names(base)) { - incrementName(base, 1) - } else { - names += base - base - } - } - - @tailrec - private def incrementName(base: String, inc: Int): String = { - val name = base + inc - if (names(name)) { - incrementName(base, inc + 1) - } else { - names += name - name - } - } -} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/ParameterExtractor.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/ParameterExtractor.scala index 6ca96875e..766c38cbe 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/ParameterExtractor.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/ParameterExtractor.scala @@ -5,42 +5,8 @@ package akka.javasdk.impl.reflection import akka.annotation.InternalApi -import akka.javasdk.Metadata import akka.javasdk.impl.serialization.JsonSerializer import akka.runtime.sdk.spi.BytesPayload -import com.google.protobuf.ByteString -import com.google.protobuf.Descriptors -import com.google.protobuf.DynamicMessage -import com.google.protobuf.any.{ Any => ScalaPbAny } -import com.google.protobuf.{ Any => JavaPbAny } - -/** - * Extracts method parameters from an invocation context for the purpose of passing them to a reflective invocation call - * - * INTERNAL API - */ -@InternalApi -private[impl] trait ParameterExtractor[-C, +T] { - def extract(context: C): T -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] trait MetadataContext { - def metadata: Metadata -} - -/** - * INTERNAL API - */ -@InternalApi -private[impl] trait DynamicMessageContext { - def getAny: ScalaPbAny - def getField(field: Descriptors.FieldDescriptor): AnyRef - def hasField(field: Descriptors.FieldDescriptor): Boolean -} /** * INTERNAL API @@ -48,19 +14,6 @@ private[impl] trait DynamicMessageContext { @InternalApi private[impl] object ParameterExtractors { - def toAny(dm: DynamicMessage) = { - val bytes = dm.getField(JavaPbAny.getDescriptor.findFieldByName("value")).asInstanceOf[ByteString] - val typeUrl = dm.getField(JavaPbAny.getDescriptor.findFieldByName("type_url")).asInstanceOf[String] - // TODO: avoid creating a new JavaPbAny instance - // we want to reuse the typeUrl validation and reading logic (skip tag + jackson reader) from JsonSupport - // we need a new internal version that also handle DynamicMessages - JavaPbAny - .newBuilder() - .setTypeUrl(typeUrl) - .setValue(bytes) - .build() - } - private def decodeParam[T](payload: BytesPayload, cls: Class[T], serializer: JsonSerializer): T = { if (cls == classOf[Array[Byte]]) { payload.bytes.toArrayUnsafe().asInstanceOf[T] diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala index a42209b2d..e8220e081 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala @@ -73,6 +73,15 @@ private[impl] object Reflect { def isAction(clazz: Class[_]): Boolean = classOf[TimedAction].isAssignableFrom(clazz) + // command handlers candidate must have 0 or 1 parameter and return the components effect type + // we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka + def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = { + effectType.runtimeClass.isAssignableFrom(method.getReturnType) && + method.getParameterTypes.length <= 1 && + // Workflow will have lambdas returning Effect, we want to filter them out + !method.getName.startsWith("lambda$") + } + def getReturnType[R](declaringClass: Class[_], method: Method): Class[R] = { if (isAction(declaringClass) || isEntity(declaringClass) || isWorkflow(declaringClass)) { // here we are expecting a wrapper in the form of an Effect diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/ReflectiveTimedActionRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/ReflectiveTimedActionRouter.scala index 98a810a64..aa83e5092 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/ReflectiveTimedActionRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/ReflectiveTimedActionRouter.scala @@ -7,7 +7,7 @@ package akka.javasdk.impl.timedaction import java.util.Optional import akka.annotation.InternalApi -import akka.javasdk.impl.CommandHandler +import akka.javasdk.impl.MethodInvoker import akka.javasdk.impl.CommandSerialization import akka.javasdk.impl.HandlerNotFoundException import akka.javasdk.impl.serialization.JsonSerializer @@ -22,14 +22,14 @@ import akka.runtime.sdk.spi.BytesPayload @InternalApi private[impl] final class ReflectiveTimedActionRouter[A <: TimedAction]( action: A, - commandHandlers: Map[String, CommandHandler], + methodInvokers: Map[String, MethodInvoker], serializer: JsonSerializer) { - private def commandHandlerLookup(commandName: String): CommandHandler = - commandHandlers.get(commandName) match { + private def methodInvokerLookup(commandName: String): MethodInvoker = + methodInvokers.get(commandName) match { case Some(handler) => handler case None => - throw new HandlerNotFoundException("command", commandName, action.getClass, commandHandlers.keySet) + throw new HandlerNotFoundException("command", commandName, action.getClass, methodInvokers.keySet) } def handleCommand( @@ -40,14 +40,13 @@ private[impl] final class ReflectiveTimedActionRouter[A <: TimedAction]( // the same handler and action instance is expected to only ever be invoked for a single command action._internalSetCommandContext(Optional.of(context)) - val commandHandler = commandHandlerLookup(methodName) + val methodInvoker = methodInvokerLookup(methodName) val payload = message.payload() if (serializer.isJson(payload) || payload.isEmpty) { // - BytesPayload.empty - there is no real command, and we are calling a method with arity 0 // - BytesPayload with json - we deserialize it and call the method - val methodInvoker = commandHandler.getSingleNameInvoker() val deserializedCommand = CommandSerialization.deserializeComponentClientCommand(methodInvoker.method, payload, serializer) val result = deserializedCommand match { @@ -57,8 +56,7 @@ private[impl] final class ReflectiveTimedActionRouter[A <: TimedAction]( result.asInstanceOf[TimedAction.Effect] } else { throw new IllegalStateException( - s"Could not find a matching command handler for method [$methodName], content type " + - s"[${payload.contentType}], invokers keys [${commandHandler.methodInvokers.keys.mkString(", ")}," + + s"Could not find a matching command handler for method [$methodName], content type [${payload.contentType}] " + s"on [${action.getClass.getName}]") } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala index 9e870442b..a204c5e64 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala @@ -93,7 +93,7 @@ private[impl] final class TimedActionImpl[TA <: TimedAction]( private val traceInstrumentation = new TraceInstrumentation(componentId, TimedActionCategory, tracerFactory) private def createRouter(): ReflectiveTimedActionRouter[TA] = - new ReflectiveTimedActionRouter[TA](factory(), componentDescriptor.commandHandlers, jsonSerializer) + new ReflectiveTimedActionRouter[TA](factory(), componentDescriptor.methodInvokers, jsonSerializer) override def handleCommand(command: Command): Future[Effect] = { val metadata = MetadataImpl.of(command.metadata) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala index c3dbe7781..1569e7a30 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala @@ -13,7 +13,7 @@ import scala.jdk.FutureConverters.CompletionStageOps import scala.jdk.OptionConverters.RichOptional import akka.annotation.InternalApi -import akka.javasdk.impl.CommandHandler +import akka.javasdk.impl.MethodInvoker import akka.javasdk.impl.CommandSerialization import akka.javasdk.impl.HandlerNotFoundException import akka.javasdk.impl.WorkflowExceptions.WorkflowException @@ -59,7 +59,7 @@ object ReflectiveWorkflowRouter { class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( workflowContext: WorkflowContext, instanceFactory: Function[WorkflowContext, W], - commandHandlers: Map[String, CommandHandler], + methodInvokers: Map[String, MethodInvoker], serializer: JsonSerializer) { private def decodeUserState(userState: Option[BytesPayload]): Option[S] = @@ -80,14 +80,14 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( } } - private def commandHandlerLookup(commandName: String) = - commandHandlers.getOrElse( + private def methodInvokerLookup(commandName: String) = + methodInvokers.getOrElse( commandName, throw new HandlerNotFoundException( "command", commandName, instanceFactory(workflowContext).getClass, - commandHandlers.keySet)) + methodInvokers.keySet)) final def handleCommand( userState: Option[SpiWorkflow.State], @@ -104,12 +104,11 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState()) workflow._internalSetup(decodedState, context, timerScheduler) - val commandHandler = commandHandlerLookup(commandName) + val methodInvoker = methodInvokerLookup(commandName) if (serializer.isJson(command) || command.isEmpty) { // - BytesPayload.empty - there is no real command, and we are calling a method with arity 0 // - BytesPayload with json - we deserialize it and call the method - val methodInvoker = commandHandler.getSingleNameInvoker() val deserializedCommand = CommandSerialization.deserializeComponentClientCommand(methodInvoker.method, command, serializer) val result = deserializedCommand match { @@ -119,8 +118,7 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( result.asInstanceOf[Workflow.Effect[_]] } else { throw new IllegalStateException( - s"Could not find a matching command handler for method [$commandName], content type " + - s"[${command.contentType}], invokers keys [${commandHandler.methodInvokers.keys.mkString(", ")}," + + s"Could not find a matching command handler for method [$commandName], content type [${command.contentType}] " + s"on [${workflow.getClass.getName}]") } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala index d9a8de788..4d96f40f0 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala @@ -68,7 +68,7 @@ class WorkflowImpl[S, W <: Workflow[S]]( private val context = new WorkflowContextImpl(workflowId) private val router = - new ReflectiveWorkflowRouter[S, W](context, instanceFactory, componentDescriptor.commandHandlers, serializer) + new ReflectiveWorkflowRouter[S, W](context, instanceFactory, componentDescriptor.methodInvokers, serializer) override def configuration: SpiWorkflow.WorkflowConfig = { val workflow = instanceFactory(context) diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumerDescriptorFactorySpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumerDescriptorFactorySpec.scala index 7b28fc399..d94aa40cb 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumerDescriptorFactorySpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumerDescriptorFactorySpec.scala @@ -50,45 +50,40 @@ class ConsumerDescriptorFactorySpec extends AnyWordSpec with Matchers { "generate mapping with Event Sourced Subscription annotations" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToEventSourcedEmployee], new JsonSerializer) - val onUpdateMethod = desc.commandHandlers("") // in case of @Migration, it should map 2 type urls to the same method - onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + desc.methodInvokers.view.mapValues(_.method.getName).toMap should contain only ("json.akka.io/created" -> "methodOne", "json.akka.io/old-created" -> "methodOne", "json.akka.io/emailUpdated" -> "methodTwo") } "generate mapping with Key Value Entity Subscription annotations (type level)" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToValueEntityTypeLevel], new JsonSerializer) - val onUpdateMethod = desc.commandHandlers("") // in case of @Migration, it should map 2 type urls to the same method - onUpdateMethod.methodInvokers should have size 2 - onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + desc.methodInvokers should have size 2 + desc.methodInvokers.view.mapValues(_.method.getName).toMap should contain only ("json.akka.io/counter-state" -> "onUpdate", "json.akka.io/" + classOf[ CounterState].getName -> "onUpdate") } "generate mapping with Key Value Entity and delete handler" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToValueEntityWithDeletes], new JsonSerializer) - val commandHandler = desc.commandHandlers("") - commandHandler.methodInvokers should have size 3 - commandHandler.methodInvokers.view.mapValues(_.method.getName).toMap should + desc.methodInvokers should have size 3 + desc.methodInvokers.view.mapValues(_.method.getName).toMap should contain only ("json.akka.io/akka.javasdk.testmodels.keyvalueentity.CounterState" -> "onUpdate", "json.akka.io/counter-state" -> "onUpdate", "type.googleapis.com/google.protobuf.Empty" -> "onDelete") } "generate mapping for a Consumer with a subscription to a topic (type level)" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToTopicTypeLevel], new JsonSerializer) - val commandHandler = desc.commandHandlers("") - commandHandler.methodInvokers should have size 1 + desc.methodInvokers should have size 1 } "generate mapping for a Consumer with a subscription to a topic (type level) combined" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToTopicTypeLevelCombined], new JsonSerializer) - val commandHandler = desc.commandHandlers("") - commandHandler.methodInvokers should have size 3 + desc.methodInvokers should have size 3 //TODO not sure why we need to support `json.akka.io/string` and `json.akka.io/java.lang.String` - commandHandler.methodInvokers.view.mapValues(_.method.getName).toMap should + desc.methodInvokers.view.mapValues(_.method.getName).toMap should contain only ("json.akka.io/akka.javasdk.testmodels.Message" -> "messageOne", "json.akka.io/string" -> "messageTwo", "json.akka.io/java.lang.String" -> "messageTwo") } @@ -179,8 +174,7 @@ class ConsumerDescriptorFactorySpec extends AnyWordSpec with Matchers { "generate mapping for a Consumer subscribing to raw bytes from a topic" in { val desc = ComponentDescriptor.descriptorFor(classOf[SubscribeToBytesFromTopic], new JsonSerializer) - val methodOne = desc.commandHandlers("") - methodOne.methodInvokers.contains("type.kalix.io/bytes") shouldBe true + desc.methodInvokers.contains("type.kalix.io/bytes") shouldBe true } "generate mapping for a Consumer with a ES subscription and publication to a topic" ignore { @@ -204,15 +198,13 @@ class ConsumerDescriptorFactorySpec extends AnyWordSpec with Matchers { "generate mappings for service to service publishing " in { val desc = ComponentDescriptor.descriptorFor(classOf[EventStreamPublishingConsumer], new JsonSerializer) - val onUpdateMethod = desc.commandHandlers("") - onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + desc.methodInvokers.view.mapValues(_.method.getName).toMap should contain only ("json.akka.io/created" -> "transform", "json.akka.io/old-created" -> "transform", "json.akka.io/emailUpdated" -> "transform") } "generate mappings for service to service subscription " in { val desc = ComponentDescriptor.descriptorFor(classOf[EventStreamSubscriptionConsumer], new JsonSerializer) - val commandHandler = desc.commandHandlers("") - commandHandler.methodInvokers should have size 3 + desc.methodInvokers should have size 3 } } diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/TimedActionDescriptorFactorySpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/TimedActionDescriptorFactorySpec.scala index 17ae681d7..4b45dec83 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/TimedActionDescriptorFactorySpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/TimedActionDescriptorFactorySpec.scala @@ -23,14 +23,12 @@ class TimedActionDescriptorFactorySpec extends AnyWordSpec with Matchers { "generate mappings for an Action with method without path param" in { val desc = ComponentDescriptor.descriptorFor(classOf[ActionWithoutParam], new JsonSerializer) - val method = desc.commandHandlers("Message") - method.methodInvokers should have size 1 + desc.methodInvokers should have size 1 } "generate mappings for an Action with method with one param" in { val desc = ComponentDescriptor.descriptorFor(classOf[ActionWithOneParam], new JsonSerializer) - val method = desc.commandHandlers("Message") - method.methodInvokers.get("") should not be empty + desc.methodInvokers.get("Message") should not be empty } }