Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: using spi for TimedAction and ESE #45

Merged
merged 10 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.2.2</kalix-runtime.version>
<kalix-runtime.version>1.2.2-7-3e32f0a1-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import akka.stream.SystemMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import kalix.runtime.KalixRuntimeMain;
import kalix.runtime.AkkaRuntimeMain;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -474,7 +474,7 @@ public SpiSettings getSettings() {
applicationConfig = runner.applicationConfig();

Config runtimeConfig = ConfigFactory.empty();
runtimeActorSystem = KalixRuntimeMain.start(Some.apply(runtimeConfig), Some.apply(runner));
runtimeActorSystem = AkkaRuntimeMain.start(Some.apply(runtimeConfig), runner);
// wait for SDK to get on start callback (or fail starting), we need it to set up the component client
var startupContext = runner.started().toCompletableFuture().get(20, TimeUnit.SECONDS);
var componentClients = startupContext.componentClients();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ final class TestKitEventSourcedEntityCommandContext(
override val commandId: Long = 0L,
override val commandName: String = "stubCommandName",
override val sequenceNumber: Long = 0L,
override val isDeleted: Boolean = false,
override val metadata: Metadata = Metadata.EMPTY)
extends CommandContext
with InternalContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

@ExtendWith(Junit5LogCapturing.class)
public class EventSourcedEntityTest extends TestKitSupport {

@Test
public void verifyCounterEventSourcedWiring() {
public void verifyCounterEventSourcedWiring() throws InterruptedException {

Thread.sleep(10000);

var counterId = "hello";
var client = componentClient.forEventSourcedEntity(counterId);
Expand All @@ -47,22 +50,30 @@ public void verifyCounterEventSourcedWiring() {

@Test
public void verifyCounterErrorEffect() {
var counterId = "hello-error";
var client = componentClient.forEventSourcedEntity(counterId);
assertThrows(IllegalArgumentException.class, () ->
increaseCounterWithError(client, -1)
);
}

@Test
public void httpVerifyCounterErrorEffect() {
CompletableFuture<StrictResponse<String>> call = httpClient.POST("/akka/v1.0/entity/counter-entity/c001/increaseWithError")
.withRequestBody(-10)
.responseBodyAs(String.class)
.invokeAsync()
.toCompletableFuture();
.withRequestBody(-10)
.responseBodyAs(String.class)
.invokeAsync()
.toCompletableFuture();

Awaitility.await()
.ignoreExceptions()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {

assertThat(call).isCompletedExceptionally();
assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class);
assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0");
});
.ignoreExceptions()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {

assertThat(call).isCompletedExceptionally();
assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class);
assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0");
});
}

@Test
Expand Down Expand Up @@ -185,6 +196,12 @@ private Integer increaseCounter(EventSourcedEntityClient client, int value) {
.invokeAsync(value));
}

private Counter increaseCounterWithError(EventSourcedEntityClient client, int value) {
return await(client
.method(CounterEntity::increaseWithError)
.invokeAsync(value));
}


private Integer multiplyCounter(EventSourcedEntityClient client, int value) {
return await(client
Expand All @@ -205,4 +222,4 @@ private Integer getCounter(EventSourcedEntityClient client) {
return await(client.method(CounterEntity::get).invokeAsync());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.hamcrest.core.IsNull;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -521,6 +522,7 @@ public void verifyMultiTableViewForUserCounters() {
}

@Test
@Disabled //TODO revert once we deal with metadata translation
public void verifyActionWithMetadata() {

String metadataValue = "action-value";
Expand Down
6 changes: 4 additions & 2 deletions akka-javasdk-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
</logger>

<logger name="akka" level="WARN"/>
<logger name="akka.runtime" level="INFO"/>
<logger name="akka.javasdk" level="INFO"/>
<logger name="akka.runtime" level="DEBUG"/>
<logger name="akka.javasdk" level="DEBUG"/>
<logger name="kalix.runtime.views" level="INFO"/>
<logger name="akka.http" level="WARN"/>
<logger name="io.grpc" level="WARN"/>
<logger name="io.r2dbc" level="WARN"/>
<logger name="akka.javasdk.impl" level="INFO"/>
<logger name="akka.javasdk.testkit" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
<!-- <appender-ref ref="STDOUT"/>-->
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface CommandContext extends MetadataContext {
*/
String entityId();

boolean isDeleted();

/** Access to tracing for custom app specific tracing. */
Tracing tracing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public void _internalSetCurrentState(S state) {
currentState = Optional.ofNullable(state);
}

/**
* INTERNAL API
* @hidden
*/
@InternalApi
public void _internalClearCurrentState() {
handlingCommands = false;
currentState = Optional.empty();
}

/**
* This is the main event handler method. Whenever an event is persisted, this handler will be called.
* It should return the new state of the entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ class DiscoveryImpl(
Component(
service.componentType,
name,
Component.ComponentSettings.Entity(
EntitySettings(service.componentId, None, forwardHeaders, EntitySettings.SpecificSettings.Empty)))
Component.ComponentSettings.Entity(EntitySettings(service.componentId, forwardHeaders)))
}
}.toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import akka.javasdk.eventsourcedentity.CommandContext
import akka.javasdk.keyvalueentity
import kalix.protocol.entity.Command
import kalix.protocol.event_sourced_entity.EventSourcedInit
import kalix.protocol.replicated_entity.ReplicatedEntityInit
import kalix.protocol.value_entity.ValueEntityInit

/**
Expand Down Expand Up @@ -71,9 +70,6 @@ private[javasdk] object EntityExceptions {
def apply(init: EventSourcedInit, message: String): EntityException =
ProtocolException(init.entityId, message)

def apply(init: ReplicatedEntityInit, message: String): EntityException =
ProtocolException(init.entityId, message)

}

def failureMessageForLog(cause: Throwable): String = cause match {
Expand Down
64 changes: 58 additions & 6 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package akka.javasdk.impl
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.jdk.FutureConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand Down Expand Up @@ -83,15 +85,18 @@ import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.action.Actions
import kalix.protocol.discovery.Discovery
import kalix.protocol.event_sourced_entity.EventSourcedEntities
import kalix.protocol.replicated_entity.ReplicatedEntities
import kalix.protocol.value_entity.ValueEntities
import kalix.protocol.view.Views
import kalix.protocol.workflow_entity.WorkflowEntities
import org.slf4j.LoggerFactory

import scala.jdk.OptionConverters.RichOptional
import scala.jdk.CollectionConverters._

import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
import akka.runtime.sdk.spi.TimedActionDescriptor

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -342,12 +347,53 @@ private final class Sdk(
}

// collect all Endpoints and compose them to build a larger router
private val httpEndpoints = componentClasses
private val httpEndpointDescriptors = componentClasses
.filter(Reflect.isRestEndpoint)
.map { httpEndpointClass =>
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

private val eventSourcedEntityDescriptors =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val entitySpi =
new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]](
sdkSettings,
sdkTracerFactory,
componentId,
clz,
messageCodec,
context =>
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
new EventSourcedEntityDescriptor(componentId, entitySpi)
}

private val timedActionDescriptors =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[TimedAction].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val timedActionClass = clz.asInstanceOf[Class[TimedAction]]
val timedActionSpi =
new TimedActionImpl[TimedAction](
() => wiredInstance(timedActionClass)(sideEffectingComponentInjects(None)),
timedActionClass,
system.classicSystem,
runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory,
messageCodec)
new TimedActionDescriptor(componentId, timedActionSpi)
}

// these are available for injecting in all kinds of component that are primarily
// for side effects
// Note: config is also always available through the combination with user DI way down below
Expand Down Expand Up @@ -375,7 +421,8 @@ private final class Sdk(
}

val actionAndConsumerServices = services.filter { case (_, service) =>
service.getClass == classOf[TimedActionService[_]] || service.getClass == classOf[ConsumerService[_]]
/*service.getClass == classOf[TimedActionService[_]] ||*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove or FIXME?

patriknw marked this conversation as resolved.
Show resolved Hide resolved
service.getClass == classOf[ConsumerService[_]]
}

if (actionAndConsumerServices.nonEmpty) {
Expand Down Expand Up @@ -484,11 +531,16 @@ private final class Sdk(
override def discovery: Discovery = discoveryEndpoint
override def actions: Option[Actions] = actionsEndpoint
override def eventSourcedEntities: Option[EventSourcedEntities] = eventSourcedEntitiesEndpoint
override def eventSourcedEntityDescriptors: Seq[EventSourcedEntityDescriptor] =
Sdk.this.eventSourcedEntityDescriptors
override def valueEntities: Option[ValueEntities] = valueEntitiesEndpoint
override def views: Option[Views] = viewsEndpoint
override def workflowEntities: Option[WorkflowEntities] = workflowEntitiesEndpoint
override def replicatedEntities: Option[ReplicatedEntities] = None
override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = httpEndpoints
override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] =
Sdk.this.httpEndpointDescriptors

override def timedActionsDescriptors: Seq[TimedActionDescriptor] =
Sdk.this.timedActionDescriptors
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[javasdk] final case class ComponentClientImpl(
}

override def forTimedAction(): TimedActionClient =
TimedActionClientImpl(runtimeComponentClients.actionClient, callMetadata)
TimedActionClientImpl(runtimeComponentClients.timedActionClient, callMetadata)

override def forKeyValueEntity(valueEntityId: String): KeyValueEntityClient =
if (valueEntityId eq null) throw new NullPointerException("Key Value entity id is null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.keyvalueentity.KeyValueEntity
import akka.javasdk.timedaction.TimedAction
import akka.javasdk.workflow.Workflow
import akka.runtime.sdk.spi.ActionRequest
import akka.runtime.sdk.spi.TimedActionRequest
import akka.runtime.sdk.spi.ActionType
import akka.runtime.sdk.spi.ComponentType
import akka.runtime.sdk.spi.EntityRequest
import akka.runtime.sdk.spi.EventSourcedEntityType
import akka.runtime.sdk.spi.KeyValueEntityType
import akka.runtime.sdk.spi.WorkflowType
import akka.runtime.sdk.spi.{ ActionClient => RuntimeActionClient }
import akka.runtime.sdk.spi.{ TimedActionClient => RuntimeTimedActionClient }
import akka.runtime.sdk.spi.{ EntityClient => RuntimeEntityClient }
import akka.util.ByteString

Expand Down Expand Up @@ -179,7 +179,7 @@ private[javasdk] final case class WorkflowClientImpl(
*/
@InternalApi
private[javasdk] final case class TimedActionClientImpl(
actionClient: RuntimeActionClient,
timedActionClient: RuntimeTimedActionClient,
callMetadata: Option[Metadata])(implicit val executionContext: ExecutionContext)
extends TimedActionClient {
override def method[T, R](methodRef: function.Function[T, TimedAction.Effect]): ComponentDeferredMethodRef[R] =
Expand Down Expand Up @@ -219,9 +219,9 @@ private[javasdk] final case class TimedActionClientImpl(
methodName,
None,
{ metadata =>
actionClient
timedActionClient
.call(
new ActionRequest(
new TimedActionRequest(
componentId,
methodName,
ContentTypes.`application/json`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ private[impl] final class EventSourcedEntitiesImpl(
with CommandContext
with ActivatableContext {
override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory)
override def isDeleted: Boolean = false // FIXME not supported by old spi
}

private class EventSourcedEntityContextImpl(override final val entityId: String)
Expand Down
Loading
Loading