Skip to content

Commit

Permalink
feat: disabling components programatically
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Jan 16, 2025
1 parent f0c0759 commit e7be5de
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import akka.http.javadsl.model.HttpRequest;
import akka.javasdk.DependencyProvider;
import akka.javasdk.Metadata;
import akka.javasdk.ServiceSetup;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.http.HttpClient;
import akka.javasdk.http.HttpClientProvider;
Expand Down Expand Up @@ -202,7 +203,7 @@ public static class Settings {
/**
* Default settings for testkit.
*/
public static Settings DEFAULT = new Settings("self", true, TEST_BROKER, MockedEventing.EMPTY, Optional.empty(), ConfigFactory.empty());
public static Settings DEFAULT = new Settings("self", true, TEST_BROKER, MockedEventing.EMPTY, Optional.empty(), ConfigFactory.empty(), Set.of());

/**
* The name of this service when deployed.
Expand All @@ -222,6 +223,8 @@ public static class Settings {

public final Optional<DependencyProvider> dependencyProvider;

public final Set<Class<?>> disabledComponents;

public enum EventingSupport {
/**
* This is the default type used and allows the testing eventing integrations without an external broker dependency
Expand All @@ -245,19 +248,21 @@ public enum EventingSupport {
}

private Settings(
String serviceName,
boolean aclEnabled,
EventingSupport eventingSupport,
MockedEventing mockedEventing,
Optional<DependencyProvider> dependencyProvider,
Config additionalConfig
) {
String serviceName,
boolean aclEnabled,
EventingSupport eventingSupport,
MockedEventing mockedEventing,
Optional<DependencyProvider> dependencyProvider,
Config additionalConfig,
Set<Class<?>> disabledComponents
) {
this.serviceName = serviceName;
this.aclEnabled = aclEnabled;
this.eventingSupport = eventingSupport;
this.mockedEventing = mockedEventing;
this.dependencyProvider = dependencyProvider;
this.additionalConfig = additionalConfig;
this.disabledComponents = disabledComponents;
}

/**
Expand All @@ -269,7 +274,7 @@ private Settings(
* @return The updated settings.
*/
public Settings withServiceName(final String serviceName) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
Expand All @@ -278,7 +283,7 @@ public Settings withServiceName(final String serviceName) {
* @return The updated settings.
*/
public Settings withAclDisabled() {
return new Settings(serviceName, false, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, false, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
Expand All @@ -287,67 +292,74 @@ public Settings withAclDisabled() {
* @return The updated settings.
*/
public Settings withAclEnabled() {
return new Settings(serviceName, true, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, true, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming messages flow from a KeyValueEntity.
*/
public Settings withKeyValueEntityIncomingMessages(String typeId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withKeyValueEntityIncomingMessages(typeId), dependencyProvider, additionalConfig);
mockedEventing.withKeyValueEntityIncomingMessages(typeId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming events flow from an EventSourcedEntity.
*/
public Settings withEventSourcedEntityIncomingMessages(String typeId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withEventSourcedIncomingMessages(typeId), dependencyProvider, additionalConfig);
mockedEventing.withEventSourcedIncomingMessages(typeId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming messages flow from a Stream (eventing.in.direct in case of protobuf SDKs).
*/
public Settings withStreamIncomingMessages(String service, String streamId) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withStreamIncomingMessages(service, streamId), dependencyProvider, additionalConfig);
mockedEventing.withStreamIncomingMessages(service, streamId), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the incoming events flow from a Topic.
*/
public Settings withTopicIncomingMessages(String topic) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withTopicIncomingMessages(topic), dependencyProvider, additionalConfig);
mockedEventing.withTopicIncomingMessages(topic), dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Mock the outgoing events flow for a Topic.
*/
public Settings withTopicOutgoingMessages(String topic) {
return new Settings(serviceName, aclEnabled, eventingSupport,
mockedEventing.withTopicOutgoingMessages(topic), dependencyProvider, additionalConfig);
mockedEventing.withTopicOutgoingMessages(topic), dependencyProvider, additionalConfig, disabledComponents);
}

public Settings withEventingSupport(EventingSupport eventingSupport) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Specify additional config that will override the application-test.conf or application.conf configuration
* in a particular test.
*/
public Settings withAdditionalConfig(Config additionalConfig) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

/**
* Set a dependency provider that will be used for looking up arbitrary dependencies, useful to provide mocks for
* production dependencies in tests rather than calling the real thing.
*/
public Settings withDependencyProvider(DependencyProvider dependencyProvider) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, Optional.of(dependencyProvider), additionalConfig);
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, Optional.of(dependencyProvider), additionalConfig, disabledComponents);
}

/**
* Disable components from running, useful for testing components in isolation. This set of disabled components will be added to {@link ServiceSetup#disabledComponents()} if configured.
*/
public Settings withDisabledComponents(Set<Class<?>> disabledComponents) {
return new Settings(serviceName, aclEnabled, eventingSupport, mockedEventing, dependencyProvider, additionalConfig, disabledComponents);
}

@Override
Expand Down Expand Up @@ -429,7 +441,7 @@ private void startRuntime(final Config config) {
try {
log.debug("Config from user: {}", config);

SdkRunner runner = new SdkRunner(settings.dependencyProvider) {
SdkRunner runner = new SdkRunner(settings.dependencyProvider, settings.disabledComponents) {
@Override
public Config applicationConfig() {
return ConfigFactory.parseString("akka.javasdk.dev-mode.enabled = true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static akkajavasdk.components.pubsub.PublishVEToTopic.CUSTOMERS_TOPIC;
Expand All @@ -42,12 +43,13 @@
@ExtendWith(Junit5LogCapturing.class)
public class SdkIntegrationTest extends TestKitSupport {


@Override
protected TestKit.Settings testKitSettings() {
// here only to show how to set different `Settings` in a test.
return TestKit.Settings.DEFAULT
.withTopicOutgoingMessages(CUSTOMERS_TOPIC);
.withTopicOutgoingMessages(CUSTOMERS_TOPIC)
//one defined here and one is the Setup class
.withDisabledComponents(Set.of(StageCounterEntity.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import akka.javasdk.ServiceSetup;
import akka.javasdk.annotations.Acl;
import akkajavasdk.components.keyvalueentities.user.ProdCounterEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

@akka.javasdk.annotations.Setup
@Acl(allow = @Acl.Matcher(principal = Acl.Principal.ALL))
public class Setup implements ServiceSetup {
Expand All @@ -20,4 +23,8 @@ public void onStartup() {
logger.info("Starting Application");
}

@Override
public Set<Class<?>> disabledComponents() {
return Set.of(ProdCounterEntity.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ akka.javasdk {
"akkajavasdk.components.workflowentities.hierarchy.TextWorkflow"
]
}
kalix-service = "akkajavasdk.components.Setup"
service-setup = "akkajavasdk.components.Setup"
}
1 change: 0 additions & 1 deletion akka-javasdk-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# Using a different port to not conflict with parallel tests
akka.javasdk.testkit.http-port = 39391
akka.javasdk.components.disable = "akkajavasdk.components.keyvalueentities.user.ProdCounterEntity,akkajavasdk.components.keyvalueentities.user.StageCounterEntity"
11 changes: 9 additions & 2 deletions akka-javasdk/src/main/java/akka/javasdk/ServiceSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.javasdk;

import akka.javasdk.annotations.Setup;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.timer.TimerScheduler;

import java.util.Set;

/**
* Implement this on a single class per deployable service annotated with {{@link Setup}} and
Expand Down Expand Up @@ -47,4 +47,11 @@ default void onStartup() {}
default DependencyProvider createDependencyProvider() {
return null;
}

/**
* Provides a set of components that should be disabled from running.
*/
default Set<Class<?>> disabledComponents() {
return Set.of();
}
}
2 changes: 0 additions & 2 deletions akka-javasdk/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,4 @@ akka.javasdk {
collector-endpoint = ${?COLLECTOR_ENDPOINT}
}
}
# The comma separated list of FQCNs of components disabled from running
components.disable = ""
}
45 changes: 29 additions & 16 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import akka.javasdk.timer.TimerScheduler
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow
import akka.javasdk.workflow.WorkflowContext
import akka.runtime.sdk.spi
import akka.runtime.sdk.spi.ComponentClients
import akka.runtime.sdk.spi.ConsumerDescriptor
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
Expand Down Expand Up @@ -113,14 +114,16 @@ object SdkRunner {
* INTERNAL API
*/
@InternalApi
class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends akka.runtime.sdk.spi.Runner {
class SdkRunner private (dependencyProvider: Option[DependencyProvider], disabledComponents: Set[Class[_]])
extends akka.runtime.sdk.spi.Runner {
private val startedPromise = Promise[StartupContext]()

// default constructor for runtime creation
def this() = this(None)
def this() = this(None, Set.empty[Class[_]])

// constructor for testkit
def this(dependencyProvider: java.util.Optional[DependencyProvider]) = this(dependencyProvider.toScala)
def this(dependencyProvider: java.util.Optional[DependencyProvider], disabledComponents: java.util.Set[Class[_]]) =
this(dependencyProvider.toScala, disabledComponents.asScala.toSet)

def applicationConfig: Config =
ApplicationConfig.loadApplicationConf
Expand Down Expand Up @@ -170,6 +173,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
startContext.remoteIdentification,
startContext.tracerFactory,
dependencyProvider,
disabledComponents,
startedPromise,
getSettings.devMode.map(_.serviceName))
Future.successful(app.spiComponents)
Expand Down Expand Up @@ -300,6 +304,7 @@ private final class Sdk(
remoteIdentification: Option[RemoteIdentification],
tracerFactory: String => Tracer,
dependencyProviderOverride: Option[DependencyProvider],
disabledComponents: Set[Class[_]],
startedPromise: Promise[StartupContext],
serviceNameOverride: Option[String]) {
private val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -354,16 +359,6 @@ private final class Sdk(
}
}

private def isDisabled(clz: Class[_]): Boolean = {
val componentName = clz.getName
if (sdkSettings.disabledComponents.contains(componentName)) {
logger.info("Ignoring component [{}] as it is disabled in the configuration", clz.getName)
true
} else {
false
}
}

// command handlers candidate must have 0 or 1 parameter and return the components effect type
// we might later revisit this, instead of single param, we can require (State, Cmd) => Effect like in Akka
def isCommandHandlerCandidate[E](method: Method)(implicit effectType: ClassTag[E]): Boolean = {
Expand Down Expand Up @@ -416,7 +411,6 @@ private final class Sdk(
// collect all Endpoints and compose them to build a larger router
private val httpEndpointDescriptors = componentClasses
.filter(Reflect.isRestEndpoint)
.filterNot(isDisabled)
.map { httpEndpointClass =>
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}
Expand All @@ -430,7 +424,6 @@ private final class Sdk(

componentClasses
.filter(hasComponentId)
.filterNot(isDisabled)
.foreach {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
Expand Down Expand Up @@ -595,8 +588,19 @@ private final class Sdk(
case _ => None
}

// service setup + integration test config
val combinedDisabledComponents =
serviceSetup.map(_.disabledComponents().asScala).getOrElse(Nil).toSet ++ disabledComponents

val descriptors =
eventSourcedEntityDescriptors ++ keyValueEntityDescriptors ++ httpEndpointDescriptors ++ timedActionDescriptors ++ consumerDescriptors ++ viewDescriptors ++ workflowDescriptors
(eventSourcedEntityDescriptors ++
keyValueEntityDescriptors ++
httpEndpointDescriptors ++
timedActionDescriptors ++
consumerDescriptors ++
viewDescriptors ++
workflowDescriptors)
.filterNot(isDisabled(combinedDisabledComponents))

val preStart = { (_: ActorSystem[_]) =>
serviceSetup match {
Expand Down Expand Up @@ -660,6 +664,15 @@ private final class Sdk(
healthCheck = () => SdkRunner.FutureDone)
}

private def isDisabled(disabledComponents: Set[Class[_]])(componentDescriptor: spi.ComponentDescriptor): Boolean = {
val className = componentDescriptor.implementationName
if (disabledComponents.map(_.getName).contains(className)) {
logger.info("Ignoring component [{}] as it is disabled in the configuration", className)
true
} else
false
}

private def httpEndpointFactory[E](httpEndpointClass: Class[E]): HttpEndpointConstructionContext => E = {
(context: HttpEndpointConstructionContext) =>
lazy val requestContext = new RequestContext {
Expand Down
6 changes: 2 additions & 4 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ private[impl] object Settings {
devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))(
DevModeSettings(
serviceName = sdkConfig.getString("dev-mode.service-name"),
httpPort = sdkConfig.getInt("dev-mode.http-port"))),
disabledComponents = sdkConfig.getString("components.disable").split(",").map(_.trim).toSet)
httpPort = sdkConfig.getInt("dev-mode.http-port"))))
}

final case class DevModeSettings(serviceName: String, httpPort: Int)
Expand All @@ -37,5 +36,4 @@ private[impl] object Settings {
private[impl] final case class Settings(
cleanupDeletedEventSourcedEntityAfter: Duration,
cleanupDeletedKeyValueEntityAfter: Duration,
devModeSettings: Option[DevModeSettings],
disabledComponents: Set[String])
devModeSettings: Option[DevModeSettings])

0 comments on commit e7be5de

Please sign in to comment.