Skip to content

Commit

Permalink
Test for multiple tests run + refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 21, 2025
1 parent 6325dc4 commit cd419c6
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package pl.touk.nussknacker.engine.process.runner

import java.io.File
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import pl.touk.nussknacker.engine.{ModelConfigs, ModelData}
import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.marshall.ScenarioParser
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.{ModelConfigs, ModelData}

import java.io.File
import java.nio.charset.StandardCharsets
import scala.util.Using
import scala.util.control.NonFatal

trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging {
trait FlinkProcessMain[Env] extends LazyLogging {

def main(argsWithHack: Array[String]): Unit = {
try {
Expand Down Expand Up @@ -61,6 +64,15 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging {
prepareExecutionConfig: ExecutionConfigPreparer
): Unit

protected def readProcessFromArg(arg: String): CanonicalProcess = {
val canonicalJson = if (arg.startsWith("@")) {
Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString)
} else {
arg
}
ScenarioParser.parseUnsafe(canonicalJson)
}

private def parseProcessVersion(json: String): ProcessVersion =
CirceUtil.decodeJsonUnsafe[ProcessVersion](json, "invalid scenario version")

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData}
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner
import pl.touk.nussknacker.engine.process.testmechanism.FlinkMiniClusterJobSubmitter
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.testmode.{
Expand All @@ -21,7 +21,7 @@ import pl.touk.nussknacker.engine.testmode.{
TestServiceInvocationCollector
}

object FlinkTestMain extends FlinkRunner {
object FlinkTestMain {

def run(
miniCluster: MiniCluster,
Expand Down Expand Up @@ -58,7 +58,7 @@ class FlinkTestMain(
deploymentData: DeploymentData
) {

private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env)
private val stubbedRunner = new FlinkMiniClusterJobSubmitter(miniCluster, env)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
Expand All @@ -67,7 +67,7 @@ class FlinkTestMain(
val registrar = prepareRegistrar(collectingListener, scenarioTestData)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
stubbedRunner.execute(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader)
stubbedRunner.submitJobAndCleanEnv(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader)
collectingListener.results
} finally {
collectingListener.clean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner
import pl.touk.nussknacker.engine.process.testmechanism.FlinkMiniClusterJobSubmitter
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector}

object FlinkVerificationMain extends FlinkRunner {
object FlinkVerificationMain {

def run(
miniCluster: MiniCluster,
Expand Down Expand Up @@ -46,15 +46,15 @@ class FlinkVerificationMain(
savepointPath: String
) {

private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env)
private val stubbedRunner = new FlinkMiniClusterJobSubmitter(miniCluster, env)

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()

registrar.register(env, process, processVersion, deploymentData, resultCollector)
stubbedRunner.execute(
stubbedRunner.submitJobAndCleanEnv(
process.name,
SavepointRestoreSettings.forPath(savepointPath, true),
modelData.modelClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import scala.jdk.CollectionConverters._

// FIXME abr: rename
// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath...
final class FlinkStubbedRunner(miniCluster: MiniCluster, env: StreamExecutionEnvironment) {
// We use MiniCluster directly, instead of LocalExecutionEnvironment, to be able to pass own classpath...
final class FlinkMiniClusterJobSubmitter(miniCluster: MiniCluster, env: StreamExecutionEnvironment) {

def execute(
def submitJobAndCleanEnv(
scenarioName: ProcessName,
savepointRestoreSettings: SavepointRestoreSettings,
modelClassLoader: ModelClassLoader
): Unit = {
// This step clean env transformations. It allows to reuse the same StreamExecutionEnvironment many times
val streamGraph = env.getStreamGraph
streamGraph.setJobName(scenarioName.value)
val jobGraph = streamGraph.getJobGraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati

object TestsMechanismMiniClusterFactory {

def createConfiguredMiniCluster(parallelism: Int): MiniCluster = {
val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = parallelism)
def createConfiguredMiniCluster(nomTaskSlots: Int): MiniCluster = {
val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = nomTaskSlots)

// it is required for proper working of HadoopFileSystem
FileSystem.initialize(miniClusterConfiguration, null)

createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = parallelism)
createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = nomTaskSlots)
}

private def prepareMiniClusterConfiguration(numTaskSlots: Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunn
}
import pl.touk.nussknacker.engine.process.helpers.SampleNodes._
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils}
import pl.touk.nussknacker.engine.util.MetaDataExtractor
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.engine.{ModelConfigs, ModelData}

import java.util.{Date, UUID}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Using

class FlinkProcessTestRunnerSpec
extends AnyWordSpec
Expand Down Expand Up @@ -144,6 +143,44 @@ class FlinkProcessTestRunnerSpec
LogService.invocationsCount.get() shouldBe 0
}

"be able to run tests multiple time on the same test runner" in {
val process =
ScenarioBuilder
.streaming(scenarioName)
.source(sourceNodeId, "input")
.emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel)

val input = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6")

def runTestAndVerify() = {
val results = testRunner.runTests(
process,
ScenarioTestData(
List(
ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6"))
)
)
)

val nodeResults = results.nodeResults

nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input))
nodeResults("out") shouldBe List(nodeResult(0, "input" -> input))

val invocationResults = results.invocationResults

invocationResults("out") shouldBe
List(ExpressionInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "Value", variable(11)))

results.externalInvocationResults("out") shouldBe List(
ExternalInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "valueMonitor", variable(11))
)
}

runTestAndVerify()
runTestAndVerify()
}

"collect results for split" in {
val process =
ScenarioBuilder
Expand Down

0 comments on commit cd419c6

Please sign in to comment.