diff --git a/e2e-tests/src/test/resources/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/batch-nu-designer.override.yml
index c16f4d402a7..ff0b35fe551 100644
--- a/e2e-tests/src/test/resources/batch-nu-designer.override.yml
+++ b/e2e-tests/src/test/resources/batch-nu-designer.override.yml
@@ -4,6 +4,8 @@ services:
environment:
CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf"
TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql"
+ # Batch adds 2 new processing types, each of them add MiniCluster for scenario testing purpose. Also batch components itself use MiniCluster.
+ # It increases memory footprint significantly
JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=500M -XX:MaxDirectMemorySize=300M"
volumes:
- ../../e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala
index dfc9d4dfacd..a1f2f9a906f 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala
@@ -38,7 +38,7 @@ object AdHocMiniClusterFallbackHandler extends LazyLogging {
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
- ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration())
+ ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration(), new Configuration())
}
}
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala
index fbdeee9fbfd..030e324e986 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala
@@ -6,23 +6,17 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati
object ScenarioTestingMiniClusterFactory {
- def createConfiguredMiniCluster(numTaskSlots: Int): MiniCluster = {
- val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = numTaskSlots)
+ def createConfiguredMiniCluster(numTaskSlots: Int, miniClusterConfig: Configuration): MiniCluster = {
+ adjustConfiguration(miniClusterConfig, numTaskSlots = numTaskSlots)
// it is required for proper working of HadoopFileSystem
- FileSystem.initialize(miniClusterConfiguration, null)
+ FileSystem.initialize(miniClusterConfig, null)
- createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = numTaskSlots)
+ createMiniCluster(miniClusterConfig, numSlotsPerTaskManager = numTaskSlots)
}
- private def prepareMiniClusterConfiguration(numTaskSlots: Int) = {
- val configuration: Configuration = new Configuration
+ private def adjustConfiguration(configuration: Configuration, numTaskSlots: Int): Unit = {
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots)
- configuration.set[Integer](RestOptions.PORT, 0)
-
- // FIXME: reversing flink default order
- configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
- configuration
}
private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = {
diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala
index 96baca04f7b..8c8789a0667 100644
--- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala
+++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala
@@ -70,9 +70,13 @@ final class ScenarioTestingMiniClusterWrapper(
object ScenarioTestingMiniClusterWrapper extends LazyLogging {
- def create(parallelism: Int, streamExecutionConfig: Configuration): ScenarioTestingMiniClusterWrapper = {
+ def create(
+ parallelism: Int,
+ miniClusterConfig: Configuration,
+ streamExecutionConfig: Configuration
+ ): ScenarioTestingMiniClusterWrapper = {
logger.debug(s"Creating MiniCluster with numTaskSlots = $parallelism")
- val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism)
+ val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism, miniClusterConfig)
logger.debug(
s"Creating local StreamExecutionEnvironment with parallelism = $parallelism and configuration = $streamExecutionConfig"
)
diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala
index b8ef97670d7..0de1c02fbc0 100644
--- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala
+++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala
@@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management
import net.ceedubs.ficus.Ficus
import net.ceedubs.ficus.readers.ValueReader
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, CoreOptions, MemorySize, RestOptions, TaskManagerOptions}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
@@ -61,6 +61,7 @@ final case class ScenarioTestingConfig(
reuseMiniClusterForScenarioTesting: Boolean = true,
reuseMiniClusterForScenarioStateVerification: Boolean = true,
parallelism: Int = 1,
+ miniClusterConfig: Configuration = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig: Configuration = new Configuration
)
@@ -71,4 +72,17 @@ object ScenarioTestingConfig {
implicit val flinkConfigurationValueReader: ValueReader[Configuration] =
Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava))
+ private[nussknacker] val defaultMiniClusterConfig: Configuration = {
+ val config = new Configuration
+ config.set[Integer](RestOptions.PORT, 0)
+ // FIXME: reversing flink default order
+ config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
+ // In some setups we create a few Flink DMs. Each of them creates its own mini cluster.
+ // To reduce footprint we decrease off-heap memory buffers size and managed memory
+ config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m"))
+ config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m"))
+ config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("50m"))
+ config
+ }
+
}
diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala
index b6ae35d6be3..c0dce580579 100644
--- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala
+++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala
@@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.management
import cats.data.ValidatedNel
import com.typesafe.config.Config
+import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
@@ -15,7 +16,7 @@ import pl.touk.nussknacker.engine.management.rest.FlinkClient
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
-class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider {
+class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider with LazyLogging {
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
@@ -28,6 +29,7 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider
deploymentConfig: Config,
scenarioStateCacheTTL: Option[FiniteDuration]
): ValidatedNel[String, DeploymentManager] = {
+ logger.info("Creating FlinkStreamingDeploymentManager")
import dependencies._
val flinkConfig = deploymentConfig.rootAs[FlinkConfig]
FlinkClient.create(flinkConfig, scenarioStateCacheTTL).map { client =>
diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala
index b4009e0bc4a..36a2dc8ccfe 100644
--- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala
+++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala
@@ -12,7 +12,7 @@ object ScenarioTestingMiniClusterWrapperFactory {
// Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded
def createIfConfigured(modelClassLoader: ModelClassLoader, config: ScenarioTestingConfig): Option[AutoCloseable] = {
if (config.reuseMiniClusterForScenarioTesting || config.reuseMiniClusterForScenarioStateVerification) {
- Some(create(modelClassLoader, config.parallelism, config.streamExecutionConfig))
+ Some(create(modelClassLoader, config.parallelism, config.miniClusterConfig, config.streamExecutionConfig))
} else {
None
}
@@ -21,6 +21,7 @@ object ScenarioTestingMiniClusterWrapperFactory {
private[nussknacker] def create(
modelClassLoader: ModelClassLoader,
parallelism: Int,
+ miniClusterConfig: Configuration,
streamExecutionConfig: Configuration
): AutoCloseable = {
val methodInvoker = new ReflectiveMethodInvoker[AutoCloseable](
@@ -28,7 +29,7 @@ object ScenarioTestingMiniClusterWrapperFactory {
"pl.touk.nussknacker.engine.process.scenariotesting.ScenarioTestingMiniClusterWrapper",
"create"
)
- methodInvoker.invokeStaticMethod(parallelism, streamExecutionConfig)
+ methodInvoker.invokeStaticMethod(parallelism, miniClusterConfig, streamExecutionConfig)
}
}
diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala
index 7f073eb8a87..bfb1153446d 100644
--- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala
+++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala
@@ -34,6 +34,7 @@ import pl.touk.nussknacker.engine.flink.test.{
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition}
+import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{
fragmentWithValidationName,
processWithFragmentParameterValidation
@@ -76,6 +77,7 @@ class FlinkProcessTestRunnerSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelClassLoader,
parallelism = 1,
+ miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)
diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala
index 78e0ce37087..7cce8d298ba 100644
--- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala
+++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala
@@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders
+import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
@@ -67,6 +68,7 @@ class KafkaScenarioTestingSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelData.modelClassLoader,
parallelism = 1,
+ miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)
diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala
index 5416256b76b..3f8744d9c6b 100644
--- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala
+++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala
@@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleV
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
+import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
@@ -88,6 +89,7 @@ class SchemedKafkaScenarioTestingSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelData.modelClassLoader,
parallelism = 1,
+ miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)
diff --git a/utils/test-utils/src/main/resources/logback-test.xml b/utils/test-utils/src/main/resources/logback-test.xml
index 961b23b4298..b06a86a4b74 100644
--- a/utils/test-utils/src/main/resources/logback-test.xml
+++ b/utils/test-utils/src/main/resources/logback-test.xml
@@ -18,7 +18,7 @@
-
+