Skip to content

Commit

Permalink
State verification mechanism reusing mini cluster + configuration + r…
Browse files Browse the repository at this point in the history
…efactors
  • Loading branch information
arkadius committed Jan 21, 2025
1 parent cd419c6 commit a9eca37
Show file tree
Hide file tree
Showing 25 changed files with 582 additions and 468 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider}
import pl.touk.nussknacker.engine.management.{
FlinkDeploymentManager,
FlinkStreamingDeploymentManagerProvider,
ScenarioTestingConfig
}
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.utils.domain.TestFactory
Expand Down Expand Up @@ -55,7 +59,8 @@ class MockDeploymentManager(
SttpBackendStub.asynchronousFuture
),
shouldVerifyBeforeDeploy = false,
mainClassName = "UNUSED"
mainClassName = "UNUSED",
scenarioTestingConfig = ScenarioTestingConfig()
) {

import MockDeploymentManager._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.management.{FlinkStreamingPropertiesConfig, ScenarioTestingConfig}
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
}

import java.net.URI
import java.util.UUID
Expand Down Expand Up @@ -50,8 +53,14 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
private val memory: TrieMap[ProcessName, StatusDetails] = TrieMap[ProcessName, StatusDetails]()
private val random = new scala.util.Random()

private lazy val scenarioTestingMiniClusterWrapperOpt =
ScenarioTestingMiniClusterWrapperFactory.createIfConfigured(
modelData.asInvokableModelData.modelClassLoader,
ScenarioTestingConfig()
)

private lazy val flinkTestRunner =
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())
new FlinkProcessTestRunner(modelData.asInvokableModelData, scenarioTestingMiniClusterWrapperOpt)

implicit private class ProcessStateExpandable(processState: StatusDetails) {

Expand Down Expand Up @@ -154,7 +163,9 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
notImplemented
}

override def close(): Unit = {}
override def close(): Unit = {
scenarioTestingMiniClusterWrapperOpt.foreach(_.close())
}

private def changeState(name: ProcessName, stateStatus: StateStatus): Unit =
memory.get(name).foreach { processState =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.data.Validated.valid
import cats.data.ValidatedNel
import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt
import pl.touk.nussknacker.engine._
Expand All @@ -13,8 +12,11 @@ import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.management.scenariotesting.{
FlinkProcessTestRunner,
ScenarioTestingMiniClusterWrapperFactory
}
import pl.touk.nussknacker.engine.management.{FlinkStreamingPropertiesConfig, ScenarioTestingConfig}
import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.engine.testing.StubbingCommands
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
Expand Down Expand Up @@ -59,10 +61,17 @@ object MockableDeploymentManagerProvider {
with ManagerSpecificScenarioActivitiesStoredByManager
with StubbingCommands {

private lazy val testRunnerOpt =
modelDataOpt.map(modelData =>
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())
private lazy val scenarioTestingMiniClusterWrapperOpt = modelDataOpt.flatMap { modelData =>
ScenarioTestingMiniClusterWrapperFactory.createIfConfigured(
modelData.asInvokableModelData.modelClassLoader,
ScenarioTestingConfig()
)
}

private lazy val testRunnerOpt =
modelDataOpt.map { modelData =>
new FlinkProcessTestRunner(modelData.asInvokableModelData, scenarioTestingMiniClusterWrapperOpt)
}

override def resolve(
idWithName: ProcessIdWithName,
Expand Down Expand Up @@ -128,7 +137,10 @@ object MockableDeploymentManagerProvider {
): Future[List[ScenarioActivity]] =
Future.successful(MockableDeploymentManager.managerSpecificScenarioActivities.get())

override def close(): Unit = {}
override def close(): Unit = {
scenarioTestingMiniClusterWrapperOpt.foreach(_.close())
}

}

// note: At the moment this manager cannot be used in tests which are executed in parallel. It can be obviously
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.touk.nussknacker.engine.process.scenariotesting

import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.util.MetaDataExtractor

// This class handles a legacy ad-hoc way to create minicluster.
// After we fully switch to single mini cluster approach, it should be removed
object AdHocMiniClusterFallbackHandler {

def handleAdHocMniClusterFallback[R](
reusableMiniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper],
scenario: CanonicalProcess
)(f: ScenarioTestingMiniClusterWrapper => R): R = {
val miniClusterWrapper = reusableMiniClusterWrapperOpt.getOrElse {
createAdHocMiniClusterWrapper(scenario)
}
try {
f(miniClusterWrapper)
} finally {
if (reusableMiniClusterWrapperOpt.isEmpty) {
miniClusterWrapper.close()
}
}
}

private def createAdHocMiniClusterWrapper(process: CanonicalProcess) = {
val scenarioParallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration())
}

}
Loading

0 comments on commit a9eca37

Please sign in to comment.