Skip to content

Commit

Permalink
chore: descriptor factories clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Jan 8, 2025
1 parent 1297c2b commit c3ffd5f
Show file tree
Hide file tree
Showing 26 changed files with 195 additions and 678 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import akka.javasdk.testkit.TestKit;
import akka.javasdk.testkit.TestKitSupport;
import akkajavasdk.components.eventsourcedentities.counter.Counter;
import akkajavasdk.components.eventsourcedentities.counter.CounterCommand;
import akkajavasdk.components.eventsourcedentities.counter.CounterEntity;
import akka.javasdk.client.EventSourcedEntityClient;
import akkajavasdk.components.eventsourcedentities.hierarchy.AbstractTextConsumer;
Expand Down Expand Up @@ -156,6 +157,24 @@ public void verifyCounterEventSourcedAfterRestartFromSnapshot() {
new IsEqual(10));
}

@Test
public void verifyRequestWithSealedCommand() {
var client = componentClient.forEventSourcedEntity("counter-with-sealed-command-handler");
await(client
.method(CounterEntity::handle)
.invokeAsync(new CounterCommand.Set(123)));

Integer result1 = await(client.method(CounterEntity::get).invokeAsync());
assertThat(result1).isEqualTo(123);

await(client
.method(CounterEntity::handle)
.invokeAsync(new CounterCommand.Increase(123)));

Integer result2 = await(client.method(CounterEntity::get).invokeAsync());
assertThat(result2).isEqualTo(246);
}

@Test
public void verifyRequestWithDefaultProtoValuesWithEntity() {
var client = componentClient.forEventSourcedEntity("some-counter");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.eventsourcedentities.counter;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = CounterCommand.Increase.class, name = "I"),
@JsonSubTypes.Type(value = CounterCommand.Set.class, name = "S")})
public sealed interface CounterCommand {

record Increase(int value) implements CounterCommand {
}

record Set(int value) implements CounterCommand {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public Effect<Integer> set(Integer value) {
return effects().persist(new CounterEvent.ValueSet(value)).thenReply(Counter::value);
}

public Effect<Integer> handle(CounterCommand counterCommand) {
return switch (counterCommand){
case CounterCommand.Increase(var value) ->
effects().persist(new CounterEvent.ValueIncreased(value)).thenReply(Counter::value);

case CounterCommand.Set(var value) ->
effects().persist(new CounterEvent.ValueSet(value)).thenReply(Counter::value);
};
}

public Effect<Integer> multiIncrease(List<Integer> increase) {
return effects().persistAll(increase.stream().map(CounterEvent.ValueIncreased::new).toList())
.thenReply(Counter::value);
Expand Down
118 changes: 0 additions & 118 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/CommandHandler.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.serialization.JsonSerializer

/**
Expand All @@ -20,21 +19,9 @@ private[impl] object ComponentDescriptor {
def descriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor =
ComponentDescriptorFactory.getFactoryFor(component).buildDescriptorFor(component, serializer)

def apply(serializer: JsonSerializer, kalixMethods: Seq[KalixMethod]): ComponentDescriptor = {

//TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all
val methods: Map[String, CommandHandler] =
kalixMethods.map { method =>
(method.serviceMethod.methodName.capitalize, method.toCommandHandler(serializer))
}.toMap

new ComponentDescriptor(methods)

}

def apply(methods: Map[String, CommandHandler]): ComponentDescriptor = {
def apply(methods: Map[String, MethodInvoker]): ComponentDescriptor = {
new ComponentDescriptor(methods)
}
}

private[akka] final case class ComponentDescriptor private (commandHandlers: Map[String, CommandHandler])
private[akka] final case class ComponentDescriptor private (methodInvokers: Map[String, MethodInvoker])
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl
import akka.javasdk.impl.ComponentDescriptorFactory._
import akka.javasdk.impl.reflection.HandleDeletesServiceMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.impl.reflection.SubscriptionServiceMethod
import akka.javasdk.impl.serialization.JsonSerializer

/**
Expand All @@ -22,35 +20,42 @@ private[impl] object ConsumerDescriptorFactory extends ComponentDescriptorFactor

import Reflect.methodOrdering

val handleDeletesMethods = component.getMethods
val handleDeletesMethods: Map[String, MethodInvoker] = component.getMethods
.filter(hasConsumerOutput)
.filter(hasHandleDeletes)
.sorted
.map { method =>
KalixMethod(HandleDeletesServiceMethod(method))
ProtobufEmptyTypeUrl -> MethodInvoker(method)
}
.toSeq
.toMap

val methods = component.getMethods
val methods: Map[String, MethodInvoker] = component.getMethods
.filter(hasConsumerOutput)
.filterNot(hasHandleDeletes)
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
.flatMap { method =>
method.getParameterTypes.headOption match {
case Some(inputType) =>
val invoker = MethodInvoker(method)
if (method.getParameterTypes.last.isSealed) {
method.getParameterTypes.last.getPermittedSubclasses.toList
.flatMap(subClass => {
serializer.contentTypesFor(subClass).map(typeUrl => typeUrl -> invoker)
})
} else {
val typeUrls = serializer.contentTypesFor(inputType)
typeUrls.map(_ -> invoker)
}
case None =>
// FIXME check if there is a validation for that already
throw new IllegalStateException(
"Consumer method must have at least one parameter, unless it is a delete handler")
}
}
.toIndexedSeq

val allMethods = methods ++ handleDeletesMethods

val commandHandlers = allMethods.map { method =>
method.toCommandHandler(serializer)
}
.toMap

//folding all invokers into a single map
val allInvokers = commandHandlers.foldLeft(Map.empty[String, MethodInvoker]) { (acc, handler) =>
acc ++ handler.methodInvokers
}
val allInvokers = methods ++ handleDeletesMethods

//Empty command/method name, because it is not used in the consumer, we just need the invokers
ComponentDescriptor(Map("" -> CommandHandler(null, serializer, null, allInvokers)))
ComponentDescriptor(allInvokers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@

package akka.javasdk.impl

import java.lang.reflect.Method

import scala.reflect.ClassTag

import akka.annotation.InternalApi
import akka.javasdk.eventsourcedentity.EventSourcedEntity
import akka.javasdk.impl.reflection.ActionHandlerMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.Reflect.isCommandHandlerCandidate
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.keyvalueentity.KeyValueEntity
import akka.javasdk.workflow.Workflow
Expand All @@ -23,41 +18,29 @@ import akka.javasdk.workflow.Workflow
private[impl] object EntityDescriptorFactory extends ComponentDescriptorFactory {

override def buildDescriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor = {

// command handlers candidate must have 0 or 1 parameter and return the components effect type
// we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka
def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = {
effectType.runtimeClass.isAssignableFrom(method.getReturnType) &&
method.getParameterTypes.length <= 1 &&
// Workflow will have lambdas returning Effect, we want to filter them out
!method.getName.startsWith("lambda$")
}

val commandHandlerMethods: Seq[KalixMethod] = if (classOf[EventSourcedEntity[_, _]].isAssignableFrom(component)) {
//TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all
val commandHandlerMethods = if (classOf[EventSourcedEntity[_, _]].isAssignableFrom(component)) {
component.getDeclaredMethods.collect {
case method if isCommandHandlerCandidate[EventSourcedEntity.Effect[_]](method) =>
val servMethod = ActionHandlerMethod(component, method)
KalixMethod(servMethod, entityIds = Seq.empty)
}.toSeq
method.getName.capitalize -> MethodInvoker(method)
}
} else if (classOf[KeyValueEntity[_]].isAssignableFrom(component)) {
component.getDeclaredMethods.collect {
case method if isCommandHandlerCandidate[KeyValueEntity.Effect[_]](method) =>
val servMethod = ActionHandlerMethod(component, method)
KalixMethod(servMethod, entityIds = Seq.empty)
}.toSeq
method.getName.capitalize -> MethodInvoker(method)
}
} else if (classOf[Workflow[_]].isAssignableFrom(component)) {
component.getDeclaredMethods.collect {
case method if isCommandHandlerCandidate[Workflow.Effect[_]](method) =>
val servMethod = ActionHandlerMethod(component, method)
KalixMethod(servMethod, entityIds = Seq.empty)
}.toSeq
method.getName.capitalize -> MethodInvoker(method)
}
} else {

// should never happen
throw new RuntimeException(
s"Unsupported component type: ${component.getName}. Supported types are: EventSourcedEntity, ValueEntity, Workflow")
}

ComponentDescriptor(serializer, commandHandlerMethods)
ComponentDescriptor(commandHandlerMethods.toMap)
}
}
Loading

0 comments on commit c3ffd5f

Please sign in to comment.