Skip to content

Commit

Permalink
[NU-1974] Initialize model classloaders only once (#7447)
Browse files Browse the repository at this point in the history
initialize model classloaders only once
  • Loading branch information
mslabek authored Jan 16, 2025
1 parent e0f92f7 commit 9ed7326
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import cats.effect.{IO, Resource}
import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.metrics5.MetricRegistry
import io.dropwizard.metrics5.jmx.JmxReporter
import pl.touk.nussknacker.engine.ConfigWithUnresolvedVersion
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.util.{JavaClassVersionChecker, SLF4JBridgeHandlerRegistrar}
import pl.touk.nussknacker.engine.{ConfigWithUnresolvedVersion, ProcessingTypeConfig}
import pl.touk.nussknacker.ui.config.{DesignerConfig, DesignerConfigLoader}
import pl.touk.nussknacker.ui.configloader.{ProcessingTypeConfigsLoader, ProcessingTypeConfigsLoaderFactory}
import pl.touk.nussknacker.ui.db.DbRef
Expand All @@ -18,6 +19,7 @@ import pl.touk.nussknacker.ui.process.processingtype.loader.{
ProcessingTypeDataLoader,
ProcessingTypesConfigBasedProcessingTypeDataLoader
}
import pl.touk.nussknacker.ui.process.processingtype.{ModelClassLoaderDependencies, ModelClassLoaderProvider}
import pl.touk.nussknacker.ui.server.{AkkaHttpBasedRouteProvider, NussknackerHttpServer}
import pl.touk.nussknacker.ui.util.{ActorSystemBasedExecutionContextWithIORuntime, IOToFutureSttpBackendConverter}
import sttp.client3.SttpBackend
Expand All @@ -40,6 +42,9 @@ class NussknackerAppFactory(
designerConfig,
ioSttpBackend
)(executionContextWithIORuntime.ioRuntime)
modelClassLoaderProvider = createModelClassLoaderProvider(
designerConfig.processingTypeConfigs.configByProcessingType
)
processingTypeDataLoader = createProcessingTypeDataLoader(processingTypeConfigsLoader)
materializer = Materializer(system)
_ <- Resource.eval(IO(JavaClassVersionChecker.check()))
Expand All @@ -54,7 +59,8 @@ class NussknackerAppFactory(
IOToFutureSttpBackendConverter.convert(ioSttpBackend)(executionContextWithIORuntime),
processingTypeDataLoader,
feStatisticsRepository,
clock
clock,
modelClassLoaderProvider
)(
system,
materializer,
Expand Down Expand Up @@ -116,6 +122,15 @@ class NussknackerAppFactory(
)
}

private def createModelClassLoaderProvider(
processingTypeConfigs: Map[String, ProcessingTypeConfig]
): ModelClassLoaderProvider = {
val defaultWorkingDirOpt = None
ModelClassLoaderProvider(
processingTypeConfigs.mapValuesNow(c => ModelClassLoaderDependencies(c.classPath, defaultWorkingDirOpt))
)
}

}

object NussknackerAppFactory {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package pl.touk.nussknacker.ui.process.processingtype

import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import java.nio.file.Path

final case class ModelClassLoaderDependencies(classpath: List[String], workingDirectoryOpt: Option[Path]) {

def show(): String = {
val workingDirectoryReadable = workingDirectoryOpt match {
case Some(value) => value.toString
case None => "None (default)"
}
s"classpath: ${classpath.mkString(", ")}, workingDirectoryOpt: $workingDirectoryReadable"
}

}

class ModelClassLoaderProvider private (
processingTypeClassLoaders: Map[String, (ModelClassLoader, ModelClassLoaderDependencies)]
) {

def forProcessingTypeUnsafe(processingTypeName: String): ModelClassLoader = {
processingTypeClassLoaders
.getOrElse(
processingTypeName,
throw new IllegalArgumentException(
s"Unknown ProcessingType: $processingTypeName, known ProcessingTypes are: ${processingTypeName.mkString(", ")}"
)
)
._1
}

def validateReloadConsistency(
dependenciesFromReload: Map[String, ModelClassLoaderDependencies]
): Unit = {
if (dependenciesFromReload.keySet != processingTypeClassLoaders.keySet) {
throw new IllegalStateException(
s"""Processing types cannot be added, removed, or renamed during processing type reload.
|Reloaded processing types: [${dependenciesFromReload.keySet.toList.sorted.mkString(", ")}]
|Current processing types: [${processingTypeClassLoaders.keySet.toList.sorted.mkString(", ")}]
|If you need to modify this, please restart the application with desired config.""".stripMargin
)
}
dependenciesFromReload.foreach { case (processingType, reloadedConfig) =>
val currentConfig = processingTypeClassLoaders.mapValuesNow(_._2)(processingType)
if (reloadedConfig != currentConfig) {
throw new IllegalStateException(
s"Error during processing types reload. Model ClassLoader dependencies such as classpath cannot be modified during reload. " +
s"For processing type [$processingType], reloaded ClassLoader dependencies: [${reloadedConfig.show()}] " +
s"do not match current dependencies: [${currentConfig.show()}]"
)
}
}
}

}

object ModelClassLoaderProvider {

def apply(processingTypeConfig: Map[String, ModelClassLoaderDependencies]): ModelClassLoaderProvider = {
val processingTypesClassloaders = processingTypeConfig.map { case (name, deps) =>
name -> (ModelClassLoader(deps.classpath, deps.workingDirectoryOpt) -> deps)
}
new ModelClassLoaderProvider(processingTypesClassloaders)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLoader.toValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataState
import pl.touk.nussknacker.ui.process.processingtype.{CombinedProcessingTypeData, ProcessingTypeData}
import pl.touk.nussknacker.ui.process.processingtype.{
CombinedProcessingTypeData,
ModelClassLoaderProvider,
ProcessingTypeData
}

class LocalProcessingTypeDataLoader(
modelData: Map[ProcessingType, (String, ModelData)],
Expand All @@ -16,7 +20,8 @@ class LocalProcessingTypeDataLoader(

override def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
modelClassLoaderProvider: ModelClassLoaderProvider
): IO[ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData]] = IO {
val processingTypes = modelData.map { case (processingType, (category, model)) =>
val deploymentManagerDependencies = getDeploymentManagerDependencies(processingType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.{DeploymentManagerDependencies, ModelDependenc
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataState
import pl.touk.nussknacker.ui.process.processingtype.{
CombinedProcessingTypeData,
ModelClassLoaderProvider,
ProcessingTypeData,
ValueWithRestriction
}
Expand All @@ -15,6 +16,7 @@ trait ProcessingTypeDataLoader {
def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
modelClassLoaderProvider: ModelClassLoaderProvider
): IO[ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData]]

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ class ProcessingTypesConfigBasedProcessingTypeDataLoader(processingTypeConfigsLo
override def loadProcessingTypeData(
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
modelClassLoaderProvider: ModelClassLoaderProvider
): IO[ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData]] = {
processingTypeConfigsLoader
.loadProcessingTypeConfigs()
.map(createProcessingTypeData(_, getModelDependencies, getDeploymentManagerDependencies))
.map(
createProcessingTypeData(_, getModelDependencies, getDeploymentManagerDependencies, modelClassLoaderProvider)
)
}

private def createProcessingTypeData(
processingTypesConfig: ProcessingTypeConfigs,
getModelDependencies: ProcessingType => ModelDependencies,
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies
getDeploymentManagerDependencies: ProcessingType => DeploymentManagerDependencies,
modelClassLoaderProvider: ModelClassLoaderProvider
): ProcessingTypeDataState[ProcessingTypeData, CombinedProcessingTypeData] = {
// This step with splitting DeploymentManagerProvider loading for all processing types
// and after that creating ProcessingTypeData is done because of the deduplication of deployments
Expand All @@ -41,15 +45,23 @@ class ProcessingTypesConfigBasedProcessingTypeDataLoader(processingTypeConfigsLo
)
(processingTypeConfig, provider, nameInputData)
}
modelClassLoaderProvider.validateReloadConsistency(providerWithNameInputData.map { case (processingType, data) =>
processingType -> ModelClassLoaderDependencies(
classpath = data._1.classPath,
workingDirectoryOpt = getModelDependencies(processingType).workingDirectoryOpt
)
})

val engineSetupNames =
ScenarioParametersDeterminer.determineEngineSetupNames(providerWithNameInputData.mapValuesNow(_._3))
val processingTypesData = providerWithNameInputData
.map { case (processingType, (processingTypeConfig, deploymentManagerProvider, _)) =>
logger.debug(s"Creating Processing Type: $processingType with config: $processingTypeConfig")
val modelDependencies = getModelDependencies(processingType)
val modelClassLoader = modelClassLoaderProvider.forProcessingTypeUnsafe(processingType)
val processingTypeData = ProcessingTypeData.createProcessingTypeData(
processingType,
ModelData(processingTypeConfig, modelDependencies),
ModelData(processingTypeConfig, modelDependencies, modelClassLoader),
deploymentManagerProvider,
getDeploymentManagerDependencies(processingType),
engineSetupNames(processingType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import pl.touk.nussknacker.ui.process.newdeployment.synchronize.{
DeploymentsStatusesSynchronizer
}
import pl.touk.nussknacker.ui.process.newdeployment.{DeploymentRepository, DeploymentService}
import pl.touk.nussknacker.ui.process.processingtype.ProcessingTypeData
import pl.touk.nussknacker.ui.process.processingtype.{ModelClassLoaderProvider, ProcessingTypeData}
import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLoader
import pl.touk.nussknacker.ui.process.processingtype.provider.ReloadableProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository._
Expand Down Expand Up @@ -109,7 +109,8 @@ class AkkaHttpBasedRouteProvider(
sttpBackend: SttpBackend[Future, Any],
processingTypeDataLoader: ProcessingTypeDataLoader,
feStatisticsRepository: FEStatisticsRepository[Future],
designerClock: Clock
designerClock: Clock,
modelClassLoaderProvider: ModelClassLoaderProvider
)(
implicit system: ActorSystem,
materializer: Materializer,
Expand Down Expand Up @@ -140,7 +141,8 @@ class AkkaHttpBasedRouteProvider(
dbioRunner,
sttpBackend,
featureTogglesConfig,
globalNotificationRepository
globalNotificationRepository,
modelClassLoaderProvider
)

deploymentsStatusesSynchronizer = new DeploymentsStatusesSynchronizer(
Expand Down Expand Up @@ -716,7 +718,8 @@ class AkkaHttpBasedRouteProvider(
dbioActionRunner: DBIOActionRunner,
sttpBackend: SttpBackend[Future, Any],
featureTogglesConfig: FeatureTogglesConfig,
globalNotificationRepository: InMemoryTimeseriesRepository[Notification]
globalNotificationRepository: InMemoryTimeseriesRepository[Notification],
modelClassLoaderProvider: ModelClassLoaderProvider
): Resource[IO, ReloadableProcessingTypeDataProvider] = {
Resource
.make(
Expand All @@ -735,6 +738,7 @@ class AkkaHttpBasedRouteProvider(
sttpBackend,
_
),
modelClassLoaderProvider
)
val loadAndNotifyIO = laodProcessingTypeDataIO
.map { state =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,18 @@ trait NuResourcesTest
protected val processingTypeConfig: ProcessingTypeConfig =
ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig)

protected val deploymentManagerProvider: DeploymentManagerProvider =
new MockManagerProvider(deploymentManager)
protected val deploymentManagerProvider: DeploymentManagerProvider = new MockManagerProvider(deploymentManager)

private val modelData = ModelData(processingTypeConfig, modelDependencies)
private val modelClassLoaderProvider = ModelClassLoaderProvider(
Map(Streaming.stringify -> ModelClassLoaderDependencies(processingTypeConfig.classPath, None))
)

private val modelData =
ModelData(
processingTypeConfig,
modelDependencies,
modelClassLoaderProvider.forProcessingTypeUnsafe(Streaming.stringify)
)

protected val testProcessingTypeDataProvider: ProcessingTypeDataProvider[ProcessingTypeData, _] =
mapProcessingTypeDataProvider(
Expand All @@ -151,7 +159,8 @@ trait NuResourcesTest
new ProcessingTypesConfigBasedProcessingTypeDataLoader(() => IO.pure(designerConfig.processingTypeConfigs))
.loadProcessingTypeData(
_ => modelDependencies,
_ => deploymentManagerDependencies
_ => deploymentManagerDependencies,
modelClassLoaderProvider
)
.unsafeRunSync()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ import cats.data.ValidatedNel
import com.google.common.collect.LinkedHashMultimap
import com.typesafe.config.Config
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.definition.{
NotBlankParameterValidator,
NotNullParameterValidator,
StringParameterEditor
}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider}
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.utils.domain.TestFactory
import shapeless.syntax.typeable.typeableOps
Expand Down Expand Up @@ -47,7 +43,8 @@ class MockDeploymentManager(
) extends FlinkDeploymentManager(
ModelData(
ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig),
TestFactory.modelDependencies
TestFactory.modelDependencies,
ModelClassLoader(ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig).classPath, None)
),
DeploymentManagerDependencies(
deployedScenariosProvider,
Expand Down
Loading

0 comments on commit 9ed7326

Please sign in to comment.