Skip to content

Commit

Permalink
PeriodicDM as decorator for any DM with data stored in the main Nu DB (
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko authored Jan 16, 2025
1 parent 9ed7326 commit fd82abe
Show file tree
Hide file tree
Showing 127 changed files with 3,065 additions and 1,537 deletions.
48 changes: 8 additions & 40 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,7 @@ lazy val distribution: Project = sbt
},
devArtifacts := {
modelArtifacts.value ++ List(
(flinkDevModel / assembly).value -> "model/devModel.jar",
(flinkPeriodicDeploymentManager / assembly).value -> "managers/nussknacker-flink-periodic-manager.jar",
(flinkDevModel / assembly).value -> "model/devModel.jar",
)
},
Universal / packageName := ("nussknacker" + "-" + version.value),
Expand Down Expand Up @@ -611,8 +610,8 @@ lazy val flinkDeploymentManager = (project in flink("management"))
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % Provided,
"org.apache.flink" % "flink-streaming-java" % flinkV % flinkScope
excludeAll (
("org.apache.flink" % "flink-streaming-java" % flinkV % flinkScope)
.excludeAll(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("com.esotericsoftware", "kryo-shaded"),
Expand All @@ -636,37 +635,6 @@ lazy val flinkDeploymentManager = (project in flink("management"))
kafkaTestUtils % "it,test"
)

lazy val flinkPeriodicDeploymentManager = (project in flink("management/periodic"))
.settings(commonSettings)
.settings(assemblyNoScala("nussknacker-flink-periodic-manager.jar"): _*)
.settings(publishAssemblySettings: _*)
.settings(
name := "nussknacker-flink-periodic-manager",
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % Provided,
"com.typesafe.slick" %% "slick" % slickV % Provided,
"com.typesafe.slick" %% "slick-hikaricp" % slickV % "provided, test",
"com.github.tminglei" %% "slick-pg" % slickPgV,
"org.hsqldb" % "hsqldb" % hsqldbV % Test,
"org.flywaydb" % "flyway-core" % flywayV % Provided,
"com.cronutils" % "cron-utils" % cronParserV,
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % Test,
)
}
)
.dependsOn(
flinkDeploymentManager,
deploymentManagerApi % Provided,
scenarioCompiler % Provided,
componentsApi % Provided,
httpUtils % Provided,
testUtils % Test
)

lazy val flinkMetricsDeferredReporter = (project in flink("metrics-deferred-reporter"))
.settings(commonSettings)
.settings(
Expand Down Expand Up @@ -1811,10 +1779,10 @@ lazy val flinkBaseUnboundedComponents = (project in flink("components/base-unbou
.settings(
name := "nussknacker-flink-base-unbounded-components",
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"com.clearspring.analytics" % "stream" % "2.9.8"
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
// It is used only in QDigest which we don't use, while it's >20MB in size...
exclude ("it.unimi.dsi", "fastutil")
("com.clearspring.analytics" % "stream" % "2.9.8")
.exclude("it.unimi.dsi", "fastutil")
)
)
.dependsOn(
Expand Down Expand Up @@ -2004,6 +1972,7 @@ lazy val designer = (project in file("designer/server"))
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirceV,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % sttpV,
"com.cronutils" % "cron-utils" % cronParserV,
"ch.qos.logback" % "logback-core" % logbackV,
"ch.qos.logback" % "logback-classic" % logbackV,
"ch.qos.logback.contrib" % "logback-json-classic" % logbackJsonV,
Expand All @@ -2026,6 +1995,7 @@ lazy val designer = (project in file("designer/server"))
"com.beachape" %% "enumeratum-circe" % enumeratumV,
"tf.tofu" %% "derevo-circe" % "0.13.0",
"com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV,
"com.github.tminglei" %% "slick-pg" % slickPgV,
"com.softwaremill.sttp.tapir" %% "tapir-akka-http-server" % tapirV,
"com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
"com.softwaremill.sttp.tapir" %% "tapir-derevo" % tapirV,
Expand Down Expand Up @@ -2080,7 +2050,6 @@ lazy val designer = (project in file("designer/server"))
liteEmbeddedDeploymentManager % Provided,
liteK8sDeploymentManager % Provided,
developmentTestsDeploymentManager % Provided,
flinkPeriodicDeploymentManager % Provided,
schemedKafkaComponentsUtils % Provided,
)

Expand Down Expand Up @@ -2170,7 +2139,6 @@ lazy val modules = List[ProjectReference](
requestResponseRuntime,
liteEngineRuntimeApp,
flinkDeploymentManager,
flinkPeriodicDeploymentManager,
flinkDevModel,
flinkDevModelJava,
flinkTableApiComponents,
Expand Down
2 changes: 1 addition & 1 deletion designer/client/src/types/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ProcessAdditionalFields, ReturnedType } from "./scenarioGraph";
import { FragmentInputParameter } from "../components/graph/node-modal/fragment-input-definition/item";
import { StickyNoteType } from "./stickyNote";

type Type = "FragmentInput" | typeof StickyNoteType | string;
type Type = "FragmentInput" | typeof StickyNoteType | string;

export type LayoutData = { x: number; y: number };

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package pl.touk.nussknacker.engine.api.deployment

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps
import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment}

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
Expand Down Expand Up @@ -48,6 +50,8 @@ trait DeploymentManager extends AutoCloseable {

def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport

def schedulingSupport: SchedulingSupport

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
Expand Down Expand Up @@ -132,3 +136,25 @@ trait DeploymentSynchronisationSupported extends DeploymentSynchronisationSuppor
}

case object NoDeploymentSynchronisationSupport extends DeploymentSynchronisationSupport

sealed trait SchedulingSupport

trait SchedulingSupported extends SchedulingSupport {

def createScheduledExecutionPerformer(
modelData: BaseModelData,
dependencies: DeploymentManagerDependencies,
deploymentConfig: Config,
): ScheduledExecutionPerformer

def customSchedulePropertyExtractorFactory: Option[SchedulePropertyExtractorFactory]

def customProcessConfigEnricherFactory: Option[ProcessConfigEnricherFactory]

def customScheduledProcessListenerFactory: Option[ScheduledProcessListenerFactory]

def customAdditionalDeploymentDataProvider: Option[AdditionalDeploymentDataProvider]

}

case object NoSchedulingSupport extends SchedulingSupport
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CachingProcessStateDeploymentManager(
cacheTTL: FiniteDuration,
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport,
override val stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport,
override val schedulingSupport: SchedulingSupport,
) extends DeploymentManager {

private val cache: AsyncCache[ProcessName, List[StatusDetails]] = Caffeine
Expand Down Expand Up @@ -83,7 +84,8 @@ object CachingProcessStateDeploymentManager extends LazyLogging {
delegate,
cacheTTL,
delegate.deploymentSynchronisationSupport,
delegate.stateQueryForAllScenariosSupport
delegate.stateQueryForAllScenariosSupport,
delegate.schedulingSupport,
)
}
.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.model

import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}

final case class DeploymentWithRuntimeParams(
processId: Option[ProcessId],
processName: ProcessName,
versionId: VersionId,
runtimeParams: RuntimeParams,
)

final case class RuntimeParams(params: Map[String, String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.model

sealed trait ScheduleProperty

object ScheduleProperty {
sealed trait SingleScheduleProperty extends ScheduleProperty

final case class MultipleScheduleProperty(
schedules: Map[String, SingleScheduleProperty]
) extends ScheduleProperty

final case class CronScheduleProperty(
labelOrCronExpr: String
) extends SingleScheduleProperty

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.model

import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId}

import java.time.LocalDateTime

case class ScheduledDeploymentDetails(
id: Long,
processName: ProcessName,
versionId: VersionId,
scheduleName: Option[String],
createdAt: LocalDateTime,
runAt: LocalDateTime,
deployedAt: Option[LocalDateTime],
completedAt: Option[LocalDateTime],
status: ScheduledDeploymentStatus,
)

sealed trait ScheduledDeploymentStatus

object ScheduledDeploymentStatus {
case object Scheduled extends ScheduledDeploymentStatus
case object Deployed extends ScheduledDeploymentStatus
case object Finished extends ScheduledDeploymentStatus
case object Failed extends ScheduledDeploymentStatus
case object RetryingDeploy extends ScheduledDeploymentStatus
case object FailedOnDeploy extends ScheduledDeploymentStatus
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.model

import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion}

case class ScheduledProcessDetails(
processVersion: ProcessVersion,
processMetaData: MetaData,
inputConfigDuringExecutionJson: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.services

import pl.touk.nussknacker.engine.api.deployment.scheduler.model.ScheduledDeploymentDetails

trait AdditionalDeploymentDataProvider {

def prepareAdditionalData(runDetails: ScheduledDeploymentDetails): Map[String, String]

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package pl.touk.nussknacker.engine.management.periodic.service
package pl.touk.nussknacker.engine.api.deployment.scheduler.services

import com.typesafe.config.{Config, ConfigFactory}
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduledDeploymentDetails, ScheduledProcessDetails}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services.ProcessConfigEnricher.{DeployData, EnrichedProcessConfig, InitialScheduleData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData.WithCanonicalProcess
import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeployment
import pl.touk.nussknacker.engine.management.periodic.service.ProcessConfigEnricher.{
DeployData,
EnrichedProcessConfig,
InitialScheduleData
}
import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution
import sttp.client3.SttpBackend

Expand All @@ -32,7 +28,6 @@ trait ProcessConfigEnricher {
object ProcessConfigEnricher {

trait ProcessConfigEnricherInputData {
def canonicalProcess: CanonicalProcess
def inputConfigDuringExecutionJson: String

def inputConfigDuringExecution: Config = {
Expand All @@ -41,13 +36,16 @@ object ProcessConfigEnricher {

}

case class InitialScheduleData(canonicalProcess: CanonicalProcess, inputConfigDuringExecutionJson: String)
extends ProcessConfigEnricherInputData
case class InitialScheduleData(
canonicalProcess: CanonicalProcess,
inputConfigDuringExecutionJson: String
) extends ProcessConfigEnricherInputData

case class DeployData(
canonicalProcess: CanonicalProcess,
inputConfigDuringExecutionJson: String,
deployment: PeriodicProcessDeployment[WithCanonicalProcess]
canonicalProcess: CanonicalProcess,
processVersion: ProcessVersion,
inputConfigDuringExecutionJson: String,
deploymentDetails: ScheduledDeploymentDetails,
) extends ProcessConfigEnricherInputData

case class EnrichedProcessConfig(inputConfigDuringExecutionJson: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.services

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.ScheduleProperty
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess

trait SchedulePropertyExtractorFactory {
def apply(config: Config): SchedulePropertyExtractor
}

trait SchedulePropertyExtractor {
def apply(canonicalProcess: CanonicalProcess): Either[String, ScheduleProperty]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.engine.api.deployment.scheduler.services

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{DeploymentWithRuntimeParams, RuntimeParams}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId}
import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution

import scala.concurrent.Future

trait ScheduledExecutionPerformer {

def provideInputConfigDuringExecutionJson(): Future[InputConfigDuringExecution]

def prepareDeploymentWithRuntimeParams(
processVersion: ProcessVersion,
): Future[DeploymentWithRuntimeParams]

def deployWithRuntimeParams(
deploymentWithJarData: DeploymentWithRuntimeParams,
inputConfigDuringExecutionJson: String,
deploymentData: DeploymentData,
canonicalProcess: CanonicalProcess,
processVersion: ProcessVersion,
): Future[Option[ExternalDeploymentId]]

def cleanAfterDeployment(
runtimeParams: RuntimeParams
): Future[Unit]

}
Loading

0 comments on commit fd82abe

Please sign in to comment.