Skip to content

Commit

Permalink
feat: disabling components programmatically (#153)
Browse files Browse the repository at this point in the history
* feat: disabling components programatically

* Update akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala

Co-authored-by: Patrik Nordwall <[email protected]>

* docs

* fixing log msg

* implementation improvements

* fixing docs

* improving documentation

---------

Co-authored-by: Patrik Nordwall <[email protected]>
  • Loading branch information
aludwiko and patriknw authored Jan 17, 2025
1 parent 55e7efa commit 5a94fed
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import akka.http.javadsl.model.HttpRequest;
import akka.javasdk.DependencyProvider;
import akka.javasdk.Metadata;
import akka.javasdk.ServiceSetup;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.http.HttpClient;
import akka.javasdk.http.HttpClientProvider;
Expand Down Expand Up @@ -202,7 +203,7 @@ public static class Settings {
/**
* Default settings for testkit.
*/
public static Settings DEFAULT = new Settings("self", true, TEST_BROKER, MockedEventing.EMPTY, Optional.empty(), ConfigFactory.empty());
public static Settings DEFAULT = new Settings("self", true, TEST_BROKER, MockedEventing.EMPTY, Optional.empty(), ConfigFactory.empty(), Set.of());

/**
* The name of this service when deployed.
Expand All @@ -222,6 +223,8 @@ public static class Settings {

public final Optional<DependencyProvider> dependencyProvider;

public final Set<Class<?>> disabledComponents;

public enum EventingSupport {
/**
* This is the default type used and allows the testing eventing integrations without an external broker dependency
Expand All @@ -245,19 +248,21 @@ public enum EventingSupport {
}

private Settings(
String serviceName,
boolean aclEnabled,
EventingSupport eventingSupport,
MockedEventing mockedEventing,
Optional<DependencyProvider> dependencyProvider,
Config additionalConfig
) {
String serviceName,
boolean aclEnabled,
EventingSupport eventingSupport,
MockedEventing mockedEventing,
Optional<DependencyProvider> dependencyProvider,
Config additionalConfig,
Set<Class<?>> disabledComponents
) {
this.serviceName = serviceName;
this.aclEnabled = aclEnabled;
this.eventingSupport = eventingSupport;
this.mockedEventing = mockedEventing;
this.dependencyProvider = dependencyProvider;
this.additionalConfig = additionalConfig;
this.disabledComponents = disabledComponents;
}

/**
Expand All @@ -269,7 +274,7 @@ private Settings(
* @return The updated settings.
*/
public Settings withServiceName(final String serviceName) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
Expand All @@ -278,7 +283,7 @@ public Settings withServiceName(final String serviceName) {
* @return The updated settings.
*/
public Settings withAclDisabled() {
return new Settings(serviceName, false, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, false, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
Expand All @@ -287,67 +292,74 @@ public Settings withAclDisabled() {
* @return The updated settings.
*/
public Settings withAclEnabled() {
return new Settings(serviceName, true, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, true, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming messages flow from a KeyValueEntity.
*/
public Settings withKeyValueEntityIncomingMessages(String typeId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withKeyValueEntityIncomingMessages(typeId), dependencyProvider, additionalConfig);
mockedEventing.withKeyValueEntityIncomingMessages(typeId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming events flow from an EventSourcedEntity.
*/
public Settings withEventSourcedEntityIncomingMessages(String typeId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withEventSourcedIncomingMessages(typeId), dependencyProvider, additionalConfig);
mockedEventing.withEventSourcedIncomingMessages(typeId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming messages flow from a Stream (eventing.in.direct in case of protobuf SDKs).
*/
public Settings withStreamIncomingMessages(String service, String streamId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withStreamIncomingMessages(service, streamId), dependencyProvider, additionalConfig);
mockedEventing.withStreamIncomingMessages(service, streamId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming events flow from a Topic.
*/
public Settings withTopicIncomingMessages(String topic) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withTopicIncomingMessages(topic), dependencyProvider, additionalConfig);
mockedEventing.withTopicIncomingMessages(topic), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the outgoing events flow for a Topic.
*/
public Settings withTopicOutgoingMessages(String topic) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withTopicOutgoingMessages(topic), dependencyProvider, additionalConfig);
mockedEventing.withTopicOutgoingMessages(topic), dependencyProvider, additionalConfig, disabledComponents);
}

public Settings withEventingSupport(EventingSupport eventingSupport) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Specify additional config that will override the application-test.conf or application.conf configuration
* in a particular test.
*/
public Settings withAdditionalConfig(Config additionalConfig) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Set a dependency provider that will be used for looking up arbitrary dependencies, useful to provide mocks for
* production dependencies in tests rather than calling the real thing.
*/
public Settings withDependencyProvider(DependencyProvider dependencyProvider) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, Optional.of(dependencyProvider), additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, Optional.of(dependencyProvider), additionalConfig, disabledComponents);
}

/**
* Disable components from running, useful for testing components in isolation. This set of disabled components will be added to {@link ServiceSetup#disabledComponents()} if configured.
*/
public Settings withDisabledComponents(Set<Class<?>> disabledComponents) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

@Override
Expand Down Expand Up @@ -429,7 +441,7 @@ private void startRuntime(final Config config) {
try {
log.debug("Config from user: {}", config);

SdkRunner runner = new SdkRunner(settings.dependencyProvider) {
SdkRunner runner = new SdkRunner(settings.dependencyProvider, settings.disabledComponents) {
@Override
public Config applicationConfig() {
return ConfigFactory.parseString("akka.javasdk.dev-mode.enabled = true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static akkajavasdk.components.pubsub.PublishVEToTopic.CUSTOMERS_TOPIC;
Expand All @@ -42,12 +43,13 @@
@ExtendWith(Junit5LogCapturing.class)
public class SdkIntegrationTest extends TestKitSupport {


@Override
protected TestKit.Settings testKitSettings() {
// here only to show how to set different `Settings` in a test.
return TestKit.Settings.DEFAULT
.withTopicOutgoingMessages(CUSTOMERS_TOPIC);
.withTopicOutgoingMessages(CUSTOMERS_TOPIC)
//one defined here and one is the Setup class
.withDisabledComponents(Set.of(StageCounterEntity.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import akka.javasdk.ServiceSetup;
import akka.javasdk.annotations.Acl;
import akkajavasdk.components.keyvalueentities.user.ProdCounterEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

@akka.javasdk.annotations.Setup
@Acl(allow = @Acl.Matcher(principal = Acl.Principal.ALL))
public class Setup implements ServiceSetup {
Expand All @@ -20,4 +23,8 @@ public void onStartup() {
logger.info("Starting Application");
}

@Override
public Set<Class<?>> disabledComponents() {
return Set.of(ProdCounterEntity.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ akka.javasdk {
"akkajavasdk.components.workflowentities.hierarchy.TextWorkflow"
]
}
kalix-service = "akkajavasdk.components.Setup"
service-setup = "akkajavasdk.components.Setup"
}
1 change: 0 additions & 1 deletion akka-javasdk-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# Using a different port to not conflict with parallel tests
akka.javasdk.testkit.http-port = 39391
akka.javasdk.components.disable = "akkajavasdk.components.keyvalueentities.user.ProdCounterEntity,akkajavasdk.components.keyvalueentities.user.StageCounterEntity"
11 changes: 9 additions & 2 deletions akka-javasdk/src/main/java/akka/javasdk/ServiceSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.javasdk;

import akka.javasdk.annotations.Setup;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.timer.TimerScheduler;

import java.util.Set;

/**
* Implement this on a single class per deployable service annotated with {{@link Setup}} and
Expand Down Expand Up @@ -47,4 +47,11 @@ default void onStartup() {}
default DependencyProvider createDependencyProvider() {
return null;
}

/**
* Provides a set of components that should be disabled from running.
*/
default Set<Class<?>> disabledComponents() {
return Set.of();
}
}
2 changes: 0 additions & 2 deletions akka-javasdk/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,4 @@ akka.javasdk {
collector-endpoint = ${?COLLECTOR_ENDPOINT}
}
}
# The comma separated list of FQCNs of components disabled from running
components.disable = ""
}
45 changes: 29 additions & 16 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import akka.javasdk.timer.TimerScheduler
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow
import akka.javasdk.workflow.WorkflowContext
import akka.runtime.sdk.spi
import akka.runtime.sdk.spi.ComponentClients
import akka.runtime.sdk.spi.ConsumerDescriptor
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
Expand Down Expand Up @@ -113,14 +114,16 @@ object SdkRunner {
* INTERNAL API
*/
@InternalApi
class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends akka.runtime.sdk.spi.Runner {
class SdkRunner private (dependencyProvider: Option[DependencyProvider], disabledComponents: Set[Class[_]])
extends akka.runtime.sdk.spi.Runner {
private val startedPromise = Promise[StartupContext]()

// default constructor for runtime creation
def this() = this(None)
def this() = this(None, Set.empty[Class[_]])

// constructor for testkit
def this(dependencyProvider: java.util.Optional[DependencyProvider]) = this(dependencyProvider.toScala)
def this(dependencyProvider: java.util.Optional[DependencyProvider], disabledComponents: java.util.Set[Class[_]]) =
this(dependencyProvider.toScala, disabledComponents.asScala.toSet)

def applicationConfig: Config =
ApplicationConfig.loadApplicationConf
Expand Down Expand Up @@ -170,6 +173,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
startContext.remoteIdentification,
startContext.tracerFactory,
dependencyProvider,
disabledComponents,
startedPromise,
getSettings.devMode.map(_.serviceName))
Future.successful(app.spiComponents)
Expand Down Expand Up @@ -300,6 +304,7 @@ private final class Sdk(
remoteIdentification: Option[RemoteIdentification],
tracerFactory: String => Tracer,
dependencyProviderOverride: Option[DependencyProvider],
disabledComponents: Set[Class[_]],
startedPromise: Promise[StartupContext],
serviceNameOverride: Option[String]) {
private val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -354,16 +359,6 @@ private final class Sdk(
}
}

private def isDisabled(clz: Class[_]): Boolean = {
val componentName = clz.getName
if (sdkSettings.disabledComponents.contains(componentName)) {
logger.info("Ignoring component [{}] as it is disabled in the configuration", clz.getName)
true
} else {
false
}
}

// command handlers candidate must have 0 or 1 parameter and return the components effect type
// we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka
def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = {
Expand Down Expand Up @@ -416,7 +411,6 @@ private final class Sdk(
// collect all Endpoints and compose them to build a larger router
private val httpEndpointDescriptors = componentClasses
.filter(Reflect.isRestEndpoint)
.filterNot(isDisabled)
.map { httpEndpointClass =>
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}
Expand All @@ -430,7 +424,6 @@ private final class Sdk(

componentClasses
.filter(hasComponentId)
.filterNot(isDisabled)
.foreach {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
Expand Down Expand Up @@ -595,8 +588,19 @@ private final class Sdk(
case _ => None
}

// service setup + integration test config
val combinedDisabledComponents =
(serviceSetup.map(_.disabledComponents().asScala.toSet).getOrElse(Set.empty) ++ disabledComponents).map(_.getName)

val descriptors =
eventSourcedEntityDescriptors ++ keyValueEntityDescriptors ++ httpEndpointDescriptors ++ timedActionDescriptors ++ consumerDescriptors ++ viewDescriptors ++ workflowDescriptors
(eventSourcedEntityDescriptors ++
keyValueEntityDescriptors ++
httpEndpointDescriptors ++
timedActionDescriptors ++
consumerDescriptors ++
viewDescriptors ++
workflowDescriptors)
.filterNot(isDisabled(combinedDisabledComponents))

val preStart = { (_: ActorSystem[_]) =>
serviceSetup match {
Expand Down Expand Up @@ -660,6 +664,15 @@ private final class Sdk(
healthCheck = () => SdkRunner.FutureDone)
}

private def isDisabled(disabledComponents: Set[String])(componentDescriptor: spi.ComponentDescriptor): Boolean = {
val className = componentDescriptor.implementationName
if (disabledComponents.contains(className)) {
logger.info("Ignoring component [{}] as it is disabled", className)
true
} else
false
}

private def httpEndpointFactory[E](httpEndpointClass: Class[E]): HttpEndpointConstructionContext => E = {
(context: HttpEndpointConstructionContext) =>
lazy val requestContext = new RequestContext {
Expand Down
6 changes: 2 additions & 4 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ private[impl] object Settings {
devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))(
DevModeSettings(
serviceName = sdkConfig.getString("dev-mode.service-name"),
httpPort = sdkConfig.getInt("dev-mode.http-port"))),
disabledComponents = sdkConfig.getString("components.disable").split(",").map(_.trim).toSet)
httpPort = sdkConfig.getInt("dev-mode.http-port"))))
}

final case class DevModeSettings(serviceName: String, httpPort: Int)
Expand All @@ -37,5 +36,4 @@ private[impl] object Settings {
private[impl] final case class Settings(
cleanupDeletedEventSourcedEntityAfter: Duration,
cleanupDeletedKeyValueEntityAfter: Duration,
devModeSettings: Option[DevModeSettings],
disabledComponents: Set[String])
devModeSettings: Option[DevModeSettings])
Loading

0 comments on commit 5a94fed

Please sign in to comment.