Skip to content

Commit

Permalink
chore: timed action without proto desc
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Dec 11, 2024
1 parent 418e163 commit 1cdcb3c
Show file tree
Hide file tree
Showing 27 changed files with 450 additions and 904 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ public void verifyTimedActionListCommand() {
});
}

@Test
public void verifyTimedActionEmpty() {
timerScheduler.startSingleTimer("echo-action", ofMillis(0), componentClient.forTimedAction()
.method(EchoAction::emptyMessage)
.deferred());

Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("empty");
});
}

@Test
public void verifyCounterEventSourceSubscription() {
// GIVEN IncreaseAction is subscribed to CounterEntity events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public EchoAction(ComponentClient componentClient) {
this.componentClient = componentClient;
}

public Effect emptyMessage() {
StaticTestBuffer.addValue("echo-action", "empty");
return effects().done();
}

public Effect stringMessage(String msg) {
StaticTestBuffer.addValue("echo-action", msg);
return effects().done();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private[akka] object AnySupport {

def isJson(any: ScalaPbAny): Boolean = isJsonTypeUrl(any.typeUrl)

def isJson(typeUrl: String): Boolean = isJsonTypeUrl(typeUrl)

def isJsonTypeUrl(typeUrl: String): Boolean =
// check both new and old typeurl for compatibility, in case there are services with old type url stored in database
typeUrl.startsWith(JsonTypeUrlPrefix) || typeUrl.startsWith(KalixJsonTypeUrlPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ private[impl] object MethodInvoker {
def apply(javaMethod: Method, parameterExtractor: ParameterExtractor[InvocationContext, AnyRef]): MethodInvoker =
MethodInvoker(javaMethod, Array(parameterExtractor))

def apply(javaMethod: Method): MethodInvoker =
MethodInvoker(javaMethod, Array.empty[ParameterExtractor[InvocationContext, AnyRef]])

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ private[impl] object ComponentDescriptor {
def descriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor =
ComponentDescriptorFactory.getFactoryFor(component).buildDescriptorFor(component, serializer, new NameGenerator)

def apply(serializer: JsonSerializer, kalixMethods: Seq[KalixMethod]): ComponentDescriptor = {

//TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all
val methods: Map[String, CommandHandler] =
kalixMethods.map { method =>
(method.serviceMethod.methodName.capitalize, method.toCommandHandler(serializer))
}.toMap

new ComponentDescriptor(null, null, methods, null, null)

}

def apply(methods: Map[String, CommandHandler]): ComponentDescriptor = {
new ComponentDescriptor(null, null, methods, null, null)
}

def apply(
nameGenerator: NameGenerator,
serializer: JsonSerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private[impl] object ComponentDescriptorFactory {
else if (Reflect.isConsumer(component))
ConsumerDescriptorFactory
else
ActionDescriptorFactory
TimedActionDescriptorFactory
}

def combineByES(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@

package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.ComponentDescriptorFactory._
import akka.javasdk.impl.reflection.HandleDeletesServiceMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.NameGenerator
import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.impl.reflection.SubscriptionServiceMethod
import ComponentDescriptorFactory._
import akka.annotation.InternalApi
import akka.javasdk.impl.serialization.JsonSerializer
import kalix.EventSource
import kalix.Eventing
import kalix.MethodOptions

/**
* INTERNAL API
Expand All @@ -27,114 +24,37 @@ private[impl] object ConsumerDescriptorFactory extends ComponentDescriptorFactor
serializer: JsonSerializer,
nameGenerator: NameGenerator): ComponentDescriptor = {

def withOptionalDestination(clazz: Class[_], source: EventSource): MethodOptions = {
val eventingBuilder = Eventing.newBuilder().setIn(source)
topicEventDestination(clazz).foreach(eventingBuilder.setOut)
kalix.MethodOptions.newBuilder().setEventing(eventingBuilder.build()).build()
}

import Reflect.methodOrdering

val handleDeletesMethods = component.getMethods
.filter(hasConsumerOutput)
.filter(hasHandleDeletes)
.sorted
.map { method =>
val source = valueEntityEventSource(component, handleDeletes = true)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(HandleDeletesServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toSeq

val subscriptionValueEntityMethods: IndexedSeq[KalixMethod] = if (hasValueEntitySubscription(component)) {
//expecting only a single update method, which is validated
component.getMethods
.filter(hasConsumerOutput)
.filterNot(hasHandleDeletes)
.map { method =>
val source = valueEntityEventSource(component, handleDeletes = false)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toIndexedSeq
} else {
IndexedSeq.empty[KalixMethod]
}

val subscriptionEventSourcedEntityClass: Map[String, Seq[KalixMethod]] =
if (hasEventSourcedEntitySubscription(component)) {
val kalixMethods =
component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(buildEventingOutOptions(component))
}
.toSeq

val entityType = findEventSourcedEntityType(component)
Map(entityType -> kalixMethods)

} else Map.empty
val methods = component.getMethods
.filter(hasConsumerOutput)
.filterNot(hasHandleDeletes)
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
}
.toIndexedSeq

val subscriptionStreamClass: Map[String, Seq[KalixMethod]] = {
streamSubscription(component)
.map { ann =>
val kalixMethods =
component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(buildEventingOutOptions(component))
}
.toSeq
val allMethods = methods ++ handleDeletesMethods

val streamId = ann.id()
Map(streamId -> kalixMethods)
}
.getOrElse(Map.empty)
val commandHandlers = allMethods.map { method =>
method.toCommandHandler(serializer)
}

// type level @Consume.FormTopic, methods eligible for subscription
val subscriptionTopicClass: Map[String, Seq[KalixMethod]] =
if (hasTopicSubscription(component)) {
val kalixMethods = component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
val source = topicEventSource(component)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toIndexedSeq
val topicName = findSubscriptionTopicName(component)
Map(topicName -> kalixMethods)
} else Map.empty

val serviceName = nameGenerator.getName(component.getSimpleName)

val serviceLevelOptions =
mergeServiceOptions(
AclDescriptorFactory.serviceLevelAclAnnotation(component),
eventingInForEventSourcedEntityServiceLevel(component),
subscribeToEventStream(component),
publishToEventStream(component))
//folding all invokers into a single map
val allInvokers = commandHandlers.foldLeft(Map.empty[String, MethodInvoker]) { (acc, handler) =>
acc ++ handler.methodInvokers
}

ComponentDescriptor(
nameGenerator,
serializer,
serviceName,
serviceOptions = serviceLevelOptions,
component.getPackageName,
handleDeletesMethods
++ subscriptionValueEntityMethods
++ combineBy("ES", subscriptionEventSourcedEntityClass, serializer, component)
++ combineBy("Stream", subscriptionStreamClass, serializer, component)
++ combineBy("Topic", subscriptionTopicClass, serializer, component))
//Empty command/method name, because it is not used in the consumer, we just need the invokers
ComponentDescriptor(Map("" -> CommandHandler(null, serializer, null, allInvokers)))
}
}
49 changes: 49 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/MethodInvokers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.serialization.JsonSerializer
import com.fasterxml.jackson.annotation.JsonSubTypes

/**
* INTERNAL API
*/
@InternalApi
case class MethodInvokers(serializer: JsonSerializer, methodInvokers: Map[String, MethodInvoker]) {

/**
* This method will look up for a registered method that receives a super type of the incoming payload. It's only
* called when a direct method is not found.
*
* The incoming typeUrl is for one of the existing sub types, but the method itself is defined to receive a super
* type. Therefore we look up the method parameter to find out if one of its sub types matches the incoming typeUrl.
*/
private def lookupMethodAcceptingSubType(inputTypeUrl: String): Option[MethodInvoker] = {
methodInvokers.values.find { javaMethod =>
//None could happen if the method is a delete handler
val lastParam = javaMethod.method.getParameterTypes.lastOption
if (lastParam.exists(_.getAnnotation(classOf[JsonSubTypes]) != null)) {
lastParam.get.getAnnotation(classOf[JsonSubTypes]).value().exists { subType =>
inputTypeUrl == serializer
.contentTypeFor(subType.value()) //TODO requires more changes to be used with JsonMigration
}
} else false
}
}

// def isSingleNameInvoker: Boolean = methodInvokers.size == 1

def lookupInvoker(inputTypeUrl: String): Option[MethodInvoker] =
methodInvokers
.get(serializer.removeVersion(inputTypeUrl))
.orElse(lookupMethodAcceptingSubType(inputTypeUrl))

def getInvoker(inputTypeUrl: String): MethodInvoker =
lookupInvoker(inputTypeUrl).getOrElse {
throw new NoSuchElementException(
s"Couldn't find any entry for typeUrl [$inputTypeUrl] in [${methodInvokers.view.mapValues(_.method.getName).mkString}].")
}
}
15 changes: 5 additions & 10 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.javasdk.impl.timedaction.TimedActionService
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.impl.view.ViewService
import akka.javasdk.impl.view.ViewsImpl
Expand Down Expand Up @@ -320,7 +319,7 @@ private final class Sdk(
.foldLeft(Map[Descriptors.ServiceDescriptor, Service]()) { (factories, clz) =>
val service: Option[Service] = if (classOf[TimedAction].isAssignableFrom(clz)) {
logger.debug(s"Registering TimedAction [${clz.getName}]")
Some(timedActionService(clz.asInstanceOf[Class[TimedAction]]))
None
} else if (classOf[Consumer].isAssignableFrom(clz)) {
logger.debug(s"Registering Consumer [${clz.getName}]")
None
Expand Down Expand Up @@ -513,7 +512,8 @@ private final class Sdk(
runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory,
serializer)
serializer,
ComponentDescriptor.descriptorFor(timedActionClass, serializer))
new TimedActionDescriptor(componentId, timedActionSpi)
}

Expand All @@ -533,7 +533,8 @@ private final class Sdk(
sdkExecutionContext,
sdkTracerFactory,
serializer,
ComponentDescriptorFactory.findIgnore(consumerClass))
ComponentDescriptorFactory.findIgnore(consumerClass),
ComponentDescriptor.descriptorFor(consumerClass, serializer))
new ConsumerDescriptor(
componentId,
consumerSource(consumerClass),
Expand Down Expand Up @@ -586,9 +587,6 @@ private final class Sdk(
case (serviceClass, _: Map[String, WorkflowService[_, _]] @unchecked)
if serviceClass == classOf[WorkflowService[_, _]] =>

case (serviceClass, _: Map[String, TimedActionService[_]] @unchecked)
if serviceClass == classOf[TimedActionService[_]] =>

case (serviceClass, viewServices: Map[String, ViewService[_]] @unchecked)
if serviceClass == classOf[ViewService[_]] =>
viewsEndpoint = Some(new ViewsImpl(viewServices, sdkDispatcherName))
Expand Down Expand Up @@ -671,9 +669,6 @@ private final class Sdk(
}
}

private def timedActionService[A <: TimedAction](clz: Class[A]): TimedActionService[A] =
new TimedActionService[A](clz, serializer, () => wiredInstance(clz)(sideEffectingComponentInjects(None)))

private def workflowService[S, W <: Workflow[S]](clz: Class[W]): WorkflowService[S, W] = {
new WorkflowService[S, W](
clz,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ import akka.javasdk.impl.serialization.JsonSerializer
* INTERNAL API
*/
@InternalApi
private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory {
private[impl] object TimedActionDescriptorFactory extends ComponentDescriptorFactory {

override def buildDescriptorFor(
component: Class[_],
serializer: JsonSerializer,
nameGenerator: NameGenerator): ComponentDescriptor = {

val serviceName = nameGenerator.getName(component.getSimpleName)

val commandHandlerMethods = component.getDeclaredMethods
.filter(hasTimedActionEffectOutput)
.map { method =>
Expand All @@ -32,12 +30,6 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
}
.toIndexedSeq

ComponentDescriptor(
nameGenerator,
serializer,
serviceName,
serviceOptions = None,
component.getPackageName,
commandHandlerMethods)
ComponentDescriptor(serializer, commandHandlerMethods)
}
}
Loading

0 comments on commit 1cdcb3c

Please sign in to comment.