From c01a5695145d6390fef4b724df22b7aa25fd9af1 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 24 Jan 2025 17:41:11 +0100 Subject: [PATCH] less memory used by mini clusters --- .../resources/batch-nu-designer.override.yml | 2 ++ .../AdHocMiniClusterFallbackHandler.scala | 2 +- .../ScenarioTestingMiniClusterFactory.scala | 16 +++++----------- .../ScenarioTestingMiniClusterWrapper.scala | 8 ++++++-- .../engine/management/FlinkConfig.scala | 16 +++++++++++++++- ...FlinkStreamingDeploymentManagerProvider.scala | 4 +++- ...cenarioTestingMiniClusterWrapperFactory.scala | 5 +++-- .../FlinkProcessTestRunnerSpec.scala | 2 ++ .../kafka/KafkaScenarioTestingSpec.scala | 2 ++ .../SchemedKafkaScenarioTestingSpec.scala | 2 ++ .../src/main/resources/logback-test.xml | 2 +- 11 files changed, 42 insertions(+), 19 deletions(-) diff --git a/e2e-tests/src/test/resources/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/batch-nu-designer.override.yml index c16f4d402a7..ff0b35fe551 100644 --- a/e2e-tests/src/test/resources/batch-nu-designer.override.yml +++ b/e2e-tests/src/test/resources/batch-nu-designer.override.yml @@ -4,6 +4,8 @@ services: environment: CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf" TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql" + # Batch adds 2 new processing types, each of them add MiniCluster for scenario testing purpose. Also batch components itself use MiniCluster. + # It increases memory footprint significantly JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=500M -XX:MaxDirectMemorySize=300M" volumes: - ../../e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala index dfc9d4dfacd..a1f2f9a906f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala @@ -38,7 +38,7 @@ object AdHocMiniClusterFallbackHandler extends LazyLogging { .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) .parallelism .getOrElse(1) - ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration()) + ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration(), new Configuration()) } } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala index fbdeee9fbfd..030e324e986 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala @@ -6,23 +6,17 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati object ScenarioTestingMiniClusterFactory { - def createConfiguredMiniCluster(numTaskSlots: Int): MiniCluster = { - val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = numTaskSlots) + def createConfiguredMiniCluster(numTaskSlots: Int, miniClusterConfig: Configuration): MiniCluster = { + adjustConfiguration(miniClusterConfig, numTaskSlots = numTaskSlots) // it is required for proper working of HadoopFileSystem - FileSystem.initialize(miniClusterConfiguration, null) + FileSystem.initialize(miniClusterConfig, null) - createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = numTaskSlots) + createMiniCluster(miniClusterConfig, numSlotsPerTaskManager = numTaskSlots) } - private def prepareMiniClusterConfiguration(numTaskSlots: Int) = { - val configuration: Configuration = new Configuration + private def adjustConfiguration(configuration: Configuration, numTaskSlots: Int): Unit = { configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots) - configuration.set[Integer](RestOptions.PORT, 0) - - // FIXME: reversing flink default order - configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - configuration } private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala index 96baca04f7b..8c8789a0667 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala @@ -70,9 +70,13 @@ final class ScenarioTestingMiniClusterWrapper( object ScenarioTestingMiniClusterWrapper extends LazyLogging { - def create(parallelism: Int, streamExecutionConfig: Configuration): ScenarioTestingMiniClusterWrapper = { + def create( + parallelism: Int, + miniClusterConfig: Configuration, + streamExecutionConfig: Configuration + ): ScenarioTestingMiniClusterWrapper = { logger.debug(s"Creating MiniCluster with numTaskSlots = $parallelism") - val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism) + val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism, miniClusterConfig) logger.debug( s"Creating local StreamExecutionEnvironment with parallelism = $parallelism and configuration = $streamExecutionConfig" ) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala index b8ef97670d7..0de1c02fbc0 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management import net.ceedubs.ficus.Ficus import net.ceedubs.ficus.readers.ValueReader -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{Configuration, CoreOptions, MemorySize, RestOptions, TaskManagerOptions} import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.jdk.CollectionConverters._ @@ -61,6 +61,7 @@ final case class ScenarioTestingConfig( reuseMiniClusterForScenarioTesting: Boolean = true, reuseMiniClusterForScenarioStateVerification: Boolean = true, parallelism: Int = 1, + miniClusterConfig: Configuration = ScenarioTestingConfig.defaultMiniClusterConfig, streamExecutionConfig: Configuration = new Configuration ) @@ -71,4 +72,17 @@ object ScenarioTestingConfig { implicit val flinkConfigurationValueReader: ValueReader[Configuration] = Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava)) + private[nussknacker] val defaultMiniClusterConfig: Configuration = { + val config = new Configuration + config.set[Integer](RestOptions.PORT, 0) + // FIXME: reversing flink default order + config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + // In some setups we create a few Flink DMs. Each of them creates its own mini cluster. + // To reduce footprint we decrease off-heap memory buffers size and managed memory + config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) + config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) + config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("50m")) + config + } + } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala index b6ae35d6be3..c0dce580579 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.management import cats.data.ValidatedNel import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig @@ -15,7 +16,7 @@ import pl.touk.nussknacker.engine.management.rest.FlinkClient import scala.concurrent.duration.FiniteDuration import scala.util.Try -class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider { +class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider with LazyLogging { import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ @@ -28,6 +29,7 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider deploymentConfig: Config, scenarioStateCacheTTL: Option[FiniteDuration] ): ValidatedNel[String, DeploymentManager] = { + logger.info("Creating FlinkStreamingDeploymentManager") import dependencies._ val flinkConfig = deploymentConfig.rootAs[FlinkConfig] FlinkClient.create(flinkConfig, scenarioStateCacheTTL).map { client => diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala index b4009e0bc4a..36a2dc8ccfe 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala @@ -12,7 +12,7 @@ object ScenarioTestingMiniClusterWrapperFactory { // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded def createIfConfigured(modelClassLoader: ModelClassLoader, config: ScenarioTestingConfig): Option[AutoCloseable] = { if (config.reuseMiniClusterForScenarioTesting || config.reuseMiniClusterForScenarioStateVerification) { - Some(create(modelClassLoader, config.parallelism, config.streamExecutionConfig)) + Some(create(modelClassLoader, config.parallelism, config.miniClusterConfig, config.streamExecutionConfig)) } else { None } @@ -21,6 +21,7 @@ object ScenarioTestingMiniClusterWrapperFactory { private[nussknacker] def create( modelClassLoader: ModelClassLoader, parallelism: Int, + miniClusterConfig: Configuration, streamExecutionConfig: Configuration ): AutoCloseable = { val methodInvoker = new ReflectiveMethodInvoker[AutoCloseable]( @@ -28,7 +29,7 @@ object ScenarioTestingMiniClusterWrapperFactory { "pl.touk.nussknacker.engine.process.scenariotesting.ScenarioTestingMiniClusterWrapper", "create" ) - methodInvoker.invokeStaticMethod(parallelism, streamExecutionConfig) + methodInvoker.invokeStaticMethod(parallelism, miniClusterConfig, streamExecutionConfig) } } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala index 7f073eb8a87..bfb1153446d 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala @@ -34,6 +34,7 @@ import pl.touk.nussknacker.engine.flink.test.{ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} +import pl.touk.nussknacker.engine.management.ScenarioTestingConfig import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{ fragmentWithValidationName, processWithFragmentParameterValidation @@ -76,6 +77,7 @@ class FlinkProcessTestRunnerSpec private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( modelClassLoader, parallelism = 1, + miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig, streamExecutionConfig = new Configuration ) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala index 78e0ce37087..7cce8d298ba 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName import pl.touk.nussknacker.engine.kafka.source.InputMeta import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders +import pl.touk.nussknacker.engine.management.ScenarioTestingConfig import pl.touk.nussknacker.engine.management.scenariotesting.{ FlinkProcessTestRunner, ScenarioTestingMiniClusterWrapperFactory @@ -67,6 +68,7 @@ class KafkaScenarioTestingSpec private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( modelData.modelClassLoader, parallelism = 1, + miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig, streamExecutionConfig = new Configuration ) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index 5416256b76b..3f8744d9c6b 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleV import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.kafka.source.InputMeta +import pl.touk.nussknacker.engine.management.ScenarioTestingConfig import pl.touk.nussknacker.engine.management.scenariotesting.{ FlinkProcessTestRunner, ScenarioTestingMiniClusterWrapperFactory @@ -88,6 +89,7 @@ class SchemedKafkaScenarioTestingSpec private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( modelData.modelClassLoader, parallelism = 1, + miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig, streamExecutionConfig = new Configuration ) diff --git a/utils/test-utils/src/main/resources/logback-test.xml b/utils/test-utils/src/main/resources/logback-test.xml index 961b23b4298..b06a86a4b74 100644 --- a/utils/test-utils/src/main/resources/logback-test.xml +++ b/utils/test-utils/src/main/resources/logback-test.xml @@ -18,7 +18,7 @@ - +