Skip to content

Commit

Permalink
less memory used by mini clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 24, 2025
1 parent bbb2eba commit c01a569
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 19 deletions.
2 changes: 2 additions & 0 deletions e2e-tests/src/test/resources/batch-nu-designer.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ services:
environment:
CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf"
TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql"
# Batch adds 2 new processing types, each of them add MiniCluster for scenario testing purpose. Also batch components itself use MiniCluster.
# It increases memory footprint significantly
JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=500M -XX:MaxDirectMemorySize=300M"
volumes:
- ../../e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object AdHocMiniClusterFallbackHandler extends LazyLogging {
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration())
ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration(), new Configuration())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,17 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati

object ScenarioTestingMiniClusterFactory {

def createConfiguredMiniCluster(numTaskSlots: Int): MiniCluster = {
val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = numTaskSlots)
def createConfiguredMiniCluster(numTaskSlots: Int, miniClusterConfig: Configuration): MiniCluster = {
adjustConfiguration(miniClusterConfig, numTaskSlots = numTaskSlots)

// it is required for proper working of HadoopFileSystem
FileSystem.initialize(miniClusterConfiguration, null)
FileSystem.initialize(miniClusterConfig, null)

createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = numTaskSlots)
createMiniCluster(miniClusterConfig, numSlotsPerTaskManager = numTaskSlots)
}

private def prepareMiniClusterConfiguration(numTaskSlots: Int) = {
val configuration: Configuration = new Configuration
private def adjustConfiguration(configuration: Configuration, numTaskSlots: Int): Unit = {
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots)
configuration.set[Integer](RestOptions.PORT, 0)

// FIXME: reversing flink default order
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
configuration
}

private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ final class ScenarioTestingMiniClusterWrapper(

object ScenarioTestingMiniClusterWrapper extends LazyLogging {

def create(parallelism: Int, streamExecutionConfig: Configuration): ScenarioTestingMiniClusterWrapper = {
def create(
parallelism: Int,
miniClusterConfig: Configuration,
streamExecutionConfig: Configuration
): ScenarioTestingMiniClusterWrapper = {
logger.debug(s"Creating MiniCluster with numTaskSlots = $parallelism")
val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism)
val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism, miniClusterConfig)
logger.debug(
s"Creating local StreamExecutionEnvironment with parallelism = $parallelism and configuration = $streamExecutionConfig"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management

import net.ceedubs.ficus.Ficus
import net.ceedubs.ficus.readers.ValueReader
import org.apache.flink.configuration.Configuration
import org.apache.flink.configuration.{Configuration, CoreOptions, MemorySize, RestOptions, TaskManagerOptions}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -61,6 +61,7 @@ final case class ScenarioTestingConfig(
reuseMiniClusterForScenarioTesting: Boolean = true,
reuseMiniClusterForScenarioStateVerification: Boolean = true,
parallelism: Int = 1,
miniClusterConfig: Configuration = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig: Configuration = new Configuration
)

Expand All @@ -71,4 +72,17 @@ object ScenarioTestingConfig {
implicit val flinkConfigurationValueReader: ValueReader[Configuration] =
Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava))

private[nussknacker] val defaultMiniClusterConfig: Configuration = {
val config = new Configuration
config.set[Integer](RestOptions.PORT, 0)
// FIXME: reversing flink default order
config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
// In some setups we create a few Flink DMs. Each of them creates its own mini cluster.
// To reduce footprint we decrease off-heap memory buffers size and managed memory
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m"))
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m"))
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("50m"))
config
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.management

import cats.data.ValidatedNel
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
Expand All @@ -15,7 +16,7 @@ import pl.touk.nussknacker.engine.management.rest.FlinkClient
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider {
class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider with LazyLogging {

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
Expand All @@ -28,6 +29,7 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider
deploymentConfig: Config,
scenarioStateCacheTTL: Option[FiniteDuration]
): ValidatedNel[String, DeploymentManager] = {
logger.info("Creating FlinkStreamingDeploymentManager")
import dependencies._
val flinkConfig = deploymentConfig.rootAs[FlinkConfig]
FlinkClient.create(flinkConfig, scenarioStateCacheTTL).map { client =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object ScenarioTestingMiniClusterWrapperFactory {
// Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded
def createIfConfigured(modelClassLoader: ModelClassLoader, config: ScenarioTestingConfig): Option[AutoCloseable] = {
if (config.reuseMiniClusterForScenarioTesting || config.reuseMiniClusterForScenarioStateVerification) {
Some(create(modelClassLoader, config.parallelism, config.streamExecutionConfig))
Some(create(modelClassLoader, config.parallelism, config.miniClusterConfig, config.streamExecutionConfig))
} else {
None
}
Expand All @@ -21,14 +21,15 @@ object ScenarioTestingMiniClusterWrapperFactory {
private[nussknacker] def create(
modelClassLoader: ModelClassLoader,
parallelism: Int,
miniClusterConfig: Configuration,
streamExecutionConfig: Configuration
): AutoCloseable = {
val methodInvoker = new ReflectiveMethodInvoker[AutoCloseable](
modelClassLoader,
"pl.touk.nussknacker.engine.process.scenariotesting.ScenarioTestingMiniClusterWrapper",
"create"
)
methodInvoker.invokeStaticMethod(parallelism, streamExecutionConfig)
methodInvoker.invokeStaticMethod(parallelism, miniClusterConfig, streamExecutionConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import pl.touk.nussknacker.engine.flink.test.{
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition}
import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{
fragmentWithValidationName,
processWithFragmentParameterValidation
Expand Down Expand Up @@ -76,6 +77,7 @@ class FlinkProcessTestRunnerSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelClassLoader,
parallelism = 1,
miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders
import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
Expand Down Expand Up @@ -67,6 +68,7 @@ class KafkaScenarioTestingSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelData.modelClassLoader,
parallelism = 1,
miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleV
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.management.ScenarioTestingConfig
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
Expand Down Expand Up @@ -88,6 +89,7 @@ class SchemedKafkaScenarioTestingSpec
private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create(
modelData.modelClassLoader,
parallelism = 1,
miniClusterConfig = ScenarioTestingConfig.defaultMiniClusterConfig,
streamExecutionConfig = new Configuration
)

Expand Down
2 changes: 1 addition & 1 deletion utils/test-utils/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</root>

<!-- let's keep here INFO - if it will produce too many logs, we should rather think if our INFO logs are too much verbose... -->
<logger name="pl.touk.nussknacker" level="${NUSSKNACKER_LOG_LEVEL:-INFO}"/>
<logger name="pl.touk.nussknacker" level="${NUSSKNACKER_LOG_LEVEL:-DEBUG}"/>
<logger name="slick.jdbc.JdbcBackend.statement" level="${SLICK_SQL_LOG_LEVEL:-INFO}"/>
<logger name="slick.jdbc.JdbcBackend.parameter" level="${SLICK_SQL_LOG_LEVEL:-INFO}"/>
<logger name="slick.jdbc.StatementInvoker.result" level="${SLICK_SQL_LOG_LEVEL:-INFO}"/>
Expand Down

0 comments on commit c01a569

Please sign in to comment.