diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index da869ea2d97..66889fe2521 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -1,18 +1,21 @@ package pl.touk.nussknacker.engine.process.runner -import java.io.File import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig -import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.marshall.ScenarioParser import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} +import java.io.File +import java.nio.charset.StandardCharsets +import scala.util.Using import scala.util.control.NonFatal -trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { +trait FlinkProcessMain[Env] extends LazyLogging { def main(argsWithHack: Array[String]): Unit = { try { @@ -61,6 +64,15 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { prepareExecutionConfig: ExecutionConfigPreparer ): Unit + protected def readProcessFromArg(arg: String): CanonicalProcess = { + val canonicalJson = if (arg.startsWith("@")) { + Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString) + } else { + arg + } + ScenarioParser.parseUnsafe(canonicalJson) + } + private def parseProcessVersion(json: String): ProcessVersion = CirceUtil.decodeJsonUnsafe[ProcessVersion](json, "invalid scenario version") diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala deleted file mode 100644 index 9089c1d0807..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala +++ /dev/null @@ -1,20 +0,0 @@ -package pl.touk.nussknacker.engine.process.runner - -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.marshall.ScenarioParser - -import java.nio.charset.StandardCharsets -import scala.util.Using - -trait FlinkRunner { - - protected def readProcessFromArg(arg: String): CanonicalProcess = { - val canonicalJson = if (arg.startsWith("@")) { - Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString) - } else { - arg - } - ScenarioParser.parseUnsafe(canonicalJson) - } - -} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index eee9b7afb2e..b6b5dd785a5 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar -import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner +import pl.touk.nussknacker.engine.process.testmechanism.FlinkMiniClusterJobSubmitter import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults import pl.touk.nussknacker.engine.testmode.{ @@ -21,7 +21,7 @@ import pl.touk.nussknacker.engine.testmode.{ TestServiceInvocationCollector } -object FlinkTestMain extends FlinkRunner { +object FlinkTestMain { def run( miniCluster: MiniCluster, @@ -58,7 +58,7 @@ class FlinkTestMain( deploymentData: DeploymentData ) { - private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env) + private val stubbedRunner = new FlinkMiniClusterJobSubmitter(miniCluster, env) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener @@ -67,7 +67,7 @@ class FlinkTestMain( val registrar = prepareRegistrar(collectingListener, scenarioTestData) registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader) + stubbedRunner.submitJobAndCleanEnv(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader) collectingListener.results } finally { collectingListener.clean() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index 86147380cda..fb38f1576ce 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -9,11 +9,11 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar -import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner +import pl.touk.nussknacker.engine.process.testmechanism.FlinkMiniClusterJobSubmitter import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} -object FlinkVerificationMain extends FlinkRunner { +object FlinkVerificationMain { def run( miniCluster: MiniCluster, @@ -46,7 +46,7 @@ class FlinkVerificationMain( savepointPath: String ) { - private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env) + private val stubbedRunner = new FlinkMiniClusterJobSubmitter(miniCluster, env) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener @@ -54,7 +54,7 @@ class FlinkVerificationMain( val registrar = prepareRegistrar() registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute( + stubbedRunner.submitJobAndCleanEnv( process.name, SavepointRestoreSettings.forPath(savepointPath, true), modelData.modelClassLoader diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkMiniClusterJobSubmitter.scala similarity index 78% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkMiniClusterJobSubmitter.scala index 446a182879d..2357418a1e0 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkMiniClusterJobSubmitter.scala @@ -8,15 +8,15 @@ import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import scala.jdk.CollectionConverters._ -// FIXME abr: rename -// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... -final class FlinkStubbedRunner(miniCluster: MiniCluster, env: StreamExecutionEnvironment) { +// We use MiniCluster directly, instead of LocalExecutionEnvironment, to be able to pass own classpath... +final class FlinkMiniClusterJobSubmitter(miniCluster: MiniCluster, env: StreamExecutionEnvironment) { - def execute( + def submitJobAndCleanEnv( scenarioName: ProcessName, savepointRestoreSettings: SavepointRestoreSettings, modelClassLoader: ModelClassLoader ): Unit = { + // This step clean env transformations. It allows to reuse the same StreamExecutionEnvironment many times val streamGraph = env.getStreamGraph streamGraph.setJobName(scenarioName.value) val jobGraph = streamGraph.getJobGraph diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala index f0a4aed9437..0fa6174d1a7 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala @@ -6,13 +6,13 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati object TestsMechanismMiniClusterFactory { - def createConfiguredMiniCluster(parallelism: Int): MiniCluster = { - val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = parallelism) + def createConfiguredMiniCluster(nomTaskSlots: Int): MiniCluster = { + val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = nomTaskSlots) // it is required for proper working of HadoopFileSystem FileSystem.initialize(miniClusterConfiguration, null) - createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = parallelism) + createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = nomTaskSlots) } private def prepareMiniClusterConfiguration(numTaskSlots: Int) = { diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala index 9e64878a596..413e17fd436 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala @@ -39,7 +39,7 @@ import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunn } import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ -import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils} +import pl.touk.nussknacker.engine.util.MetaDataExtractor import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} @@ -47,7 +47,6 @@ import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} -import scala.util.Using class FlinkProcessTestRunnerSpec extends AnyWordSpec @@ -144,6 +143,44 @@ class FlinkProcessTestRunnerSpec LogService.invocationsCount.get() shouldBe 0 } + "be able to run tests multiple time on the same test runner" in { + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val input = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") + + def runTestAndVerify() = { + val results = testRunner.runTests( + process, + ScenarioTestData( + List( + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + ) + ) + ) + + val nodeResults = results.nodeResults + + nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input)) + nodeResults("out") shouldBe List(nodeResult(0, "input" -> input)) + + val invocationResults = results.invocationResults + + invocationResults("out") shouldBe + List(ExpressionInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "Value", variable(11))) + + results.externalInvocationResults("out") shouldBe List( + ExternalInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "valueMonitor", variable(11)) + ) + } + + runTestAndVerify() + runTestAndVerify() + } + "collect results for split" in { val process = ScenarioBuilder