Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: component exclusion config #128

Merged
merged 10 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import akkajavasdk.components.eventsourcedentities.counter.Counter;
import akkajavasdk.components.eventsourcedentities.counter.CounterEntity;
import akkajavasdk.components.keyvalueentities.customer.CustomerEntity;
import akkajavasdk.components.keyvalueentities.user.ProdCounterEntity;
import akkajavasdk.components.keyvalueentities.user.StageCounterEntity;
import akkajavasdk.components.keyvalueentities.user.TestCounterEntity;
import akkajavasdk.components.keyvalueentities.user.User;
import akkajavasdk.components.keyvalueentities.user.UserEntity;
import akkajavasdk.components.keyvalueentities.user.UserSideEffect;
Expand All @@ -23,6 +26,7 @@
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNull;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -38,13 +42,44 @@
@ExtendWith(Junit5LogCapturing.class)
public class SdkIntegrationTest extends TestKitSupport {


@Override
protected TestKit.Settings testKitSettings() {
// here only to show how to set different `Settings` in a test.
return TestKit.Settings.DEFAULT
.withTopicOutgoingMessages(CUSTOMERS_TOPIC);
}

@Test
public void verifyIfComponentIsActiveBasedOnConfig() {

var result = await(componentClient.forKeyValueEntity("test")
.method(TestCounterEntity::get)
.invokeAsync());

assertThat(result).isEqualTo(100);
}

@Test
public void verifyIfComponentIsDisabledBasedOnConfig() {

var exc1 = Assertions.assertThrows(IllegalArgumentException.class, () -> {
await(componentClient.forKeyValueEntity("test")
.method(ProdCounterEntity::get)
.invokeAsync());
});

assertThat(exc1.getMessage()).contains("Unknown entity type [prod-counter]");

var exc2 = Assertions.assertThrows(IllegalArgumentException.class, () -> {
await(componentClient.forKeyValueEntity("test")
.method(StageCounterEntity::get)
.invokeAsync());
});

assertThat(exc2.getMessage()).contains("Unknown entity type [stage-counter]");
}


@Test
public void verifyEchoActionWiring() {
Expand All @@ -65,41 +100,41 @@ public void verifyEchoActionWiring() {
public void verifyHierarchyTimedActionWiring() {

timerScheduler.startSingleTimer("wired", ofMillis(0), componentClient.forTimedAction()
.method(HierarchyTimed::stringMessage)
.deferred("hello"));
.method(HierarchyTimed::stringMessage)
.deferred("hello"));

Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("hierarchy-action");
assertThat(value).isEqualTo("hello");
});
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("hierarchy-action");
assertThat(value).isEqualTo("hello");
});
}

@Test
public void verifyTimedActionListCommand() {

timerScheduler.startSingleTimer("echo-action", ofMillis(0), componentClient.forTimedAction()
.method(EchoAction::stringMessages)
.deferred(List.of("hello", "mr")));
.method(EchoAction::stringMessages)
.deferred(List.of("hello", "mr")));

Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("hello mr");
});
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("hello mr");
});

timerScheduler.startSingleTimer("echo-action", ofMillis(0), componentClient.forTimedAction()
.method(EchoAction::commandMessages)
.deferred(List.of(new EchoAction.SomeCommand("tambourine"), new EchoAction.SomeCommand("man"))));
.method(EchoAction::commandMessages)
.deferred(List.of(new EchoAction.SomeCommand("tambourine"), new EchoAction.SomeCommand("man"))));

Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("tambourine man");
});
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
var value = StaticTestBuffer.getValue("echo-action");
assertThat(value).isEqualTo("tambourine man");
});
}

@Test
Expand Down Expand Up @@ -195,9 +230,6 @@ public void verifyFindCounterByValue() {
}





@Test
public void verifyUserSubscriptionAction() {

Expand All @@ -221,7 +253,6 @@ public void verifyUserSubscriptionAction() {
}



@Test
public void verifyActionWithMetadata() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.keyvalueentities.user;

import akka.javasdk.annotations.ComponentId;
import akka.javasdk.keyvalueentity.KeyValueEntity;
import akka.javasdk.keyvalueentity.KeyValueEntityContext;

@ComponentId("prod-counter")
public class ProdCounterEntity extends KeyValueEntity<Integer> {
private final String entityId;

public ProdCounterEntity(KeyValueEntityContext context) {
this.entityId = context.entityId();
}

@Override
public Integer emptyState() {
return 100;
}

public Effect<Integer> get() {
return effects().reply(currentState());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.keyvalueentities.user;

import akka.javasdk.annotations.ComponentId;
import akka.javasdk.keyvalueentity.KeyValueEntity;
import akka.javasdk.keyvalueentity.KeyValueEntityContext;

@ComponentId("stage-counter")
public class StageCounterEntity extends KeyValueEntity<Integer> {
private final String entityId;

public StageCounterEntity(KeyValueEntityContext context) {
this.entityId = context.entityId();
}

@Override
public Integer emptyState() {
return 100;
}

public Effect<Integer> get() {
return effects().reply(currentState());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.keyvalueentities.user;

import akka.javasdk.annotations.ComponentId;
import akka.javasdk.keyvalueentity.KeyValueEntity;
import akka.javasdk.keyvalueentity.KeyValueEntityContext;

@ComponentId("test-counter")
public class TestCounterEntity extends KeyValueEntity<Integer> {
private final String entityId;

public TestCounterEntity(KeyValueEntityContext context) {
this.entityId = context.entityId();
}

@Override
public Integer emptyState() {
return 100;
}

public Effect<Integer> get() {
return effects().reply(currentState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ akka.javasdk {
"akkajavasdk.components.keyvalueentities.user.AssignedCounterEntity",
"akkajavasdk.components.keyvalueentities.hierarchy.TextKvEntity",
"akkajavasdk.components.views.AllTheTypesKvEntity"
"akkajavasdk.components.keyvalueentities.user.TestCounterEntity"
"akkajavasdk.components.keyvalueentities.user.StageCounterEntity"
"akkajavasdk.components.keyvalueentities.user.ProdCounterEntity"
]
view = [
"akkajavasdk.components.views.user.UsersByEmailAndName",
Expand Down
1 change: 1 addition & 0 deletions akka-javasdk-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Using a different port to not conflict with parallel tests
akka.javasdk.testkit.http-port = 39391
akka.javasdk.components.disable = "akkajavasdk.components.keyvalueentities.user.ProdCounterEntity,akkajavasdk.components.keyvalueentities.user.StageCounterEntity"
2 changes: 2 additions & 0 deletions akka-javasdk/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ akka.javasdk {
collector-endpoint = ${?COLLECTOR_ENDPOINT}
}
}
# The comma separated list of FQCNs of components disabled from running
components.disable = ""
}
32 changes: 21 additions & 11 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ package akka.javasdk.impl
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand Down Expand Up @@ -95,10 +100,6 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.event.Level

import java.util
import java.util.Optional
import scala.jdk.OptionConverters.RichOption

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -354,6 +355,16 @@ private final class Sdk(
}
}

private def isDisabled(clz: Class[_]): Boolean = {
val componentName = clz.getName
if (sdkSettings.disabledComponents.contains(componentName)) {
logger.info("Ignoring component [{}] as it is disabled in the configuration", clz.getName)
true
} else {
false
}
}

// command handlers candidate must have 0 or 1 parameter and return the components effect type
// we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka
def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = {
Expand Down Expand Up @@ -405,6 +416,7 @@ private final class Sdk(
// collect all Endpoints and compose them to build a larger router
private val httpEndpointDescriptors = componentClasses
.filter(Reflect.isRestEndpoint)
.filterNot(isDisabled)
.map { httpEndpointClass =>
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}
Expand All @@ -414,9 +426,11 @@ private final class Sdk(
private var workflowDescriptors = Vector.empty[WorkflowDescriptor]
private var timedActionDescriptors = Vector.empty[TimedActionDescriptor]
private var consumerDescriptors = Vector.empty[ConsumerDescriptor]
private var viewDescriptors = Vector.empty[ViewDescriptor]

componentClasses
.filter(hasComponentId)
.filterNot(isDisabled)
.foreach {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
Expand Down Expand Up @@ -545,6 +559,9 @@ private final class Sdk(
consumerDestination(consumerClass),
timedActionSpi)

case clz if classOf[View].isAssignableFrom(clz) =>
viewDescriptors :+= ViewDescriptorFactory(clz, serializer, sdkExecutionContext)

case clz if Reflect.isRestEndpoint(clz) =>
// handled separately because ComponentId is not mandatory

Expand All @@ -553,13 +570,6 @@ private final class Sdk(
logger.warn("Unknown component [{}]", clz.getName)
}

private val viewDescriptors: Seq[ViewDescriptor] =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[View].isAssignableFrom(clz) => ViewDescriptorFactory(clz, serializer, sdkExecutionContext)
}

// these are available for injecting in all kinds of component that are primarily
// for side effects
// Note: config is also always available through the combination with user DI way down below
Expand Down
6 changes: 4 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ private[impl] object Settings {
devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))(
DevModeSettings(
serviceName = sdkConfig.getString("dev-mode.service-name"),
httpPort = sdkConfig.getInt("dev-mode.http-port"))))
httpPort = sdkConfig.getInt("dev-mode.http-port"))),
disabledComponents = sdkConfig.getString("components.disable").split(",").map(_.trim).toSet)
}

final case class DevModeSettings(serviceName: String, httpPort: Int)
Expand All @@ -36,4 +37,5 @@ private[impl] object Settings {
private[impl] final case class Settings(
cleanupDeletedEventSourcedEntityAfter: Duration,
cleanupDeletedKeyValueEntityAfter: Duration,
devModeSettings: Option[DevModeSettings])
devModeSettings: Option[DevModeSettings],
disabledComponents: Set[String])
Loading