Skip to content

Commit

Permalink
chore: timed action without proto desc
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Dec 11, 2024
1 parent 5178e1e commit cd85594
Show file tree
Hide file tree
Showing 34 changed files with 460 additions and 990 deletions.
4 changes: 2 additions & 2 deletions akka-javasdk-maven/akka-javasdk-archetype/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>akka-javasdk-archetype</artifactId>
<version>3.0.1</version>
<version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</version>
<packaging>maven-archetype</packaging>
<parent>
<groupId>io.akka</groupId>
<artifactId>akka-javasdk-maven</artifactId>
<version>3.0.1</version>
<version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</version>
</parent>

<name>Akka SDK for Java Maven Archetype</name>
Expand Down
6 changes: 3 additions & 3 deletions akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<groupId>io.akka</groupId>
<artifactId>akka-javasdk-maven</artifactId>
<version>3.0.1</version>
<version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</version>
</parent>

<groupId>io.akka</groupId>
<artifactId>akka-javasdk-parent</artifactId>
<version>3.0.1</version>
<version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</version>
<packaging>pom</packaging>


Expand All @@ -34,7 +34,7 @@

<maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>

<akka-javasdk.version>3.0.1</akka-javasdk.version>
<akka-javasdk.version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</akka-javasdk.version>

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
Expand Down
2 changes: 1 addition & 1 deletion akka-javasdk-maven/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.akka</groupId>
<artifactId>akka-javasdk-maven</artifactId>
<version>3.0.1</version>
<version>3.0.1-21-d40dbe4d-dev-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Akka SDK for Java Maven</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ public void verifyTimedActionListCommand() {
});
}

@Test
public void verifyTimedActionEmpty() {
timerScheduler.startSingleTimer("echo-action", ofMillis(0), componentClient.forTimedAction()
.method(EchoAction::emptyMessage)
.deferred());

Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("empty");
});
}

@Test
public void verifyCounterEventSourceSubscription() {
// GIVEN IncreaseAction is subscribed to CounterEntity events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public EchoAction(ComponentClient componentClient) {
this.componentClient = componentClient;
}

public Effect emptyMessage() {
StaticTestBuffer.addValue("echo-action", "empty");
return effects().done();
}

public Effect stringMessage(String msg) {
StaticTestBuffer.addValue("echo-action", msg);
return effects().done();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private[akka] object AnySupport {

def isJson(any: ScalaPbAny): Boolean = isJsonTypeUrl(any.typeUrl)

def isJson(typeUrl: String): Boolean = isJsonTypeUrl(typeUrl)

def isJsonTypeUrl(typeUrl: String): Boolean =
// check both new and old typeurl for compatibility, in case there are services with old type url stored in database
typeUrl.startsWith(JsonTypeUrlPrefix) || typeUrl.startsWith(KalixJsonTypeUrlPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ private[impl] object MethodInvoker {
def apply(javaMethod: Method, parameterExtractor: ParameterExtractor[InvocationContext, AnyRef]): MethodInvoker =
MethodInvoker(javaMethod, Array(parameterExtractor))

def apply(javaMethod: Method): MethodInvoker =
MethodInvoker(javaMethod, Array.empty[ParameterExtractor[InvocationContext, AnyRef]])

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ private[impl] object ComponentDescriptor {
def descriptorFor(component: Class[_], serializer: JsonSerializer): ComponentDescriptor =
ComponentDescriptorFactory.getFactoryFor(component).buildDescriptorFor(component, serializer, new NameGenerator)

def apply(serializer: JsonSerializer, kalixMethods: Seq[KalixMethod]): ComponentDescriptor = {

//TODO remove capitalization of method name, can't be done per component, because component client reuse the same logic for all
val methods: Map[String, CommandHandler] =
kalixMethods.map { method =>
(method.serviceMethod.methodName.capitalize, method.toCommandHandler(serializer))
}.toMap

new ComponentDescriptor(null, null, methods, null, null)

}

def apply(methods: Map[String, CommandHandler]): ComponentDescriptor = {
new ComponentDescriptor(null, null, methods, null, null)
}

def apply(
nameGenerator: NameGenerator,
serializer: JsonSerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private[impl] object ComponentDescriptorFactory {
else if (Reflect.isConsumer(component))
ConsumerDescriptorFactory
else
ActionDescriptorFactory
TimedActionDescriptorFactory
}

def combineByES(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@

package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.ComponentDescriptorFactory._
import akka.javasdk.impl.reflection.HandleDeletesServiceMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.NameGenerator
import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.impl.reflection.SubscriptionServiceMethod
import ComponentDescriptorFactory._
import akka.annotation.InternalApi
import akka.javasdk.impl.serialization.JsonSerializer
import kalix.EventSource
import kalix.Eventing
import kalix.MethodOptions

/**
* INTERNAL API
Expand All @@ -27,114 +24,37 @@ private[impl] object ConsumerDescriptorFactory extends ComponentDescriptorFactor
serializer: JsonSerializer,
nameGenerator: NameGenerator): ComponentDescriptor = {

def withOptionalDestination(clazz: Class[_], source: EventSource): MethodOptions = {
val eventingBuilder = Eventing.newBuilder().setIn(source)
topicEventDestination(clazz).foreach(eventingBuilder.setOut)
kalix.MethodOptions.newBuilder().setEventing(eventingBuilder.build()).build()
}

import Reflect.methodOrdering

val handleDeletesMethods = component.getMethods
.filter(hasConsumerOutput)
.filter(hasHandleDeletes)
.sorted
.map { method =>
val source = valueEntityEventSource(component, handleDeletes = true)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(HandleDeletesServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toSeq

val subscriptionValueEntityMethods: IndexedSeq[KalixMethod] = if (hasValueEntitySubscription(component)) {
//expecting only a single update method, which is validated
component.getMethods
.filter(hasConsumerOutput)
.filterNot(hasHandleDeletes)
.map { method =>
val source = valueEntityEventSource(component, handleDeletes = false)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toIndexedSeq
} else {
IndexedSeq.empty[KalixMethod]
}

val subscriptionEventSourcedEntityClass: Map[String, Seq[KalixMethod]] =
if (hasEventSourcedEntitySubscription(component)) {
val kalixMethods =
component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(buildEventingOutOptions(component))
}
.toSeq

val entityType = findEventSourcedEntityType(component)
Map(entityType -> kalixMethods)

} else Map.empty
val methods = component.getMethods
.filter(hasConsumerOutput)
.filterNot(hasHandleDeletes)
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
}
.toIndexedSeq

val subscriptionStreamClass: Map[String, Seq[KalixMethod]] = {
streamSubscription(component)
.map { ann =>
val kalixMethods =
component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(buildEventingOutOptions(component))
}
.toSeq
val allMethods = methods ++ handleDeletesMethods

val streamId = ann.id()
Map(streamId -> kalixMethods)
}
.getOrElse(Map.empty)
val commandHandlers = allMethods.map { method =>
method.toCommandHandler(serializer)
}

// type level @Consume.FormTopic, methods eligible for subscription
val subscriptionTopicClass: Map[String, Seq[KalixMethod]] =
if (hasTopicSubscription(component)) {
val kalixMethods = component.getMethods
.filter(hasConsumerOutput)
.sorted // make sure we get the methods in deterministic order
.map { method =>
val source = topicEventSource(component)
val kalixOptions = withOptionalDestination(component, source)
KalixMethod(SubscriptionServiceMethod(method))
.withKalixOptions(kalixOptions)
}
.toIndexedSeq
val topicName = findSubscriptionTopicName(component)
Map(topicName -> kalixMethods)
} else Map.empty

val serviceName = nameGenerator.getName(component.getSimpleName)

val serviceLevelOptions =
mergeServiceOptions(
AclDescriptorFactory.serviceLevelAclAnnotation(component),
eventingInForEventSourcedEntityServiceLevel(component),
subscribeToEventStream(component),
publishToEventStream(component))
//folding all invokers into a single map
val allInvokers = commandHandlers.foldLeft(Map.empty[String, MethodInvoker]) { (acc, handler) =>
acc ++ handler.methodInvokers
}

ComponentDescriptor(
nameGenerator,
serializer,
serviceName,
serviceOptions = serviceLevelOptions,
component.getPackageName,
handleDeletesMethods
++ subscriptionValueEntityMethods
++ combineBy("ES", subscriptionEventSourcedEntityClass, serializer, component)
++ combineBy("Stream", subscriptionStreamClass, serializer, component)
++ combineBy("Topic", subscriptionTopicClass, serializer, component))
//Empty command/method name, because it is not used in the consumer, we just need the invokers
ComponentDescriptor(Map("" -> CommandHandler(null, serializer, null, allInvokers)))
}
}
49 changes: 49 additions & 0 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/MethodInvokers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.impl.serialization.JsonSerializer
import com.fasterxml.jackson.annotation.JsonSubTypes

/**
* INTERNAL API
*/
@InternalApi
case class MethodInvokers(serializer: JsonSerializer, methodInvokers: Map[String, MethodInvoker]) {

/**
* This method will look up for a registered method that receives a super type of the incoming payload. It's only
* called when a direct method is not found.
*
* The incoming typeUrl is for one of the existing sub types, but the method itself is defined to receive a super
* type. Therefore we look up the method parameter to find out if one of its sub types matches the incoming typeUrl.
*/
private def lookupMethodAcceptingSubType(inputTypeUrl: String): Option[MethodInvoker] = {
methodInvokers.values.find { javaMethod =>
//None could happen if the method is a delete handler
val lastParam = javaMethod.method.getParameterTypes.lastOption
if (lastParam.exists(_.getAnnotation(classOf[JsonSubTypes]) != null)) {
lastParam.get.getAnnotation(classOf[JsonSubTypes]).value().exists { subType =>
inputTypeUrl == serializer
.contentTypeFor(subType.value()) //TODO requires more changes to be used with JsonMigration
}
} else false
}
}

// def isSingleNameInvoker: Boolean = methodInvokers.size == 1

def lookupInvoker(inputTypeUrl: String): Option[MethodInvoker] =
methodInvokers
.get(serializer.removeVersion(inputTypeUrl))
.orElse(lookupMethodAcceptingSubType(inputTypeUrl))

def getInvoker(inputTypeUrl: String): MethodInvoker =
lookupInvoker(inputTypeUrl).getOrElse {
throw new NoSuchElementException(
s"Couldn't find any entry for typeUrl [$inputTypeUrl] in [${methodInvokers.view.mapValues(_.method.getName).mkString}].")
}
}
15 changes: 5 additions & 10 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.javasdk.impl.timedaction.TimedActionService
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.impl.view.ViewService
import akka.javasdk.impl.view.ViewsImpl
Expand Down Expand Up @@ -320,7 +319,7 @@ private final class Sdk(
.foldLeft(Map[Descriptors.ServiceDescriptor, Service]()) { (factories, clz) =>
val service: Option[Service] = if (classOf[TimedAction].isAssignableFrom(clz)) {
logger.debug(s"Registering TimedAction [${clz.getName}]")
Some(timedActionService(clz.asInstanceOf[Class[TimedAction]]))
None
} else if (classOf[Consumer].isAssignableFrom(clz)) {
logger.debug(s"Registering Consumer [${clz.getName}]")
None
Expand Down Expand Up @@ -513,7 +512,8 @@ private final class Sdk(
runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory,
serializer)
serializer,
ComponentDescriptor.descriptorFor(timedActionClass, serializer))
new TimedActionDescriptor(componentId, timedActionSpi)
}

Expand All @@ -533,7 +533,8 @@ private final class Sdk(
sdkExecutionContext,
sdkTracerFactory,
serializer,
ComponentDescriptorFactory.findIgnore(consumerClass))
ComponentDescriptorFactory.findIgnore(consumerClass),
ComponentDescriptor.descriptorFor(consumerClass, serializer))
new ConsumerDescriptor(
componentId,
consumerSource(consumerClass),
Expand Down Expand Up @@ -586,9 +587,6 @@ private final class Sdk(
case (serviceClass, _: Map[String, WorkflowService[_, _]] @unchecked)
if serviceClass == classOf[WorkflowService[_, _]] =>

case (serviceClass, _: Map[String, TimedActionService[_]] @unchecked)
if serviceClass == classOf[TimedActionService[_]] =>

case (serviceClass, viewServices: Map[String, ViewService[_]] @unchecked)
if serviceClass == classOf[ViewService[_]] =>
viewsEndpoint = Some(new ViewsImpl(viewServices, sdkDispatcherName))
Expand Down Expand Up @@ -665,9 +663,6 @@ private final class Sdk(
}
}

private def timedActionService[A <: TimedAction](clz: Class[A]): TimedActionService[A] =
new TimedActionService[A](clz, serializer, () => wiredInstance(clz)(sideEffectingComponentInjects(None)))

private def workflowService[S, W <: Workflow[S]](clz: Class[W]): WorkflowService[S, W] = {
new WorkflowService[S, W](
clz,
Expand Down
Loading

0 comments on commit cd85594

Please sign in to comment.