Skip to content

Commit

Permalink
Merge branch 'staging' into flink-test-mechanism-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 24, 2025
2 parents 3e1c83a + 700ae63 commit af76389
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,7 @@ class PeriodicProcessService(
)
}
inputConfigDuringExecutionJsonOpt <- periodicProcessesRepository
.fetchInputConfigDuringExecutionJson(
processName,
versionId,
)
.fetchInputConfigDuringExecutionJson(deployment.periodicProcess.id)
.run
inputConfigDuringExecutionJson = inputConfigDuringExecutionJsonOpt.getOrElse {
throw new PeriodicProcessException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ class SlickLegacyPeriodicProcessesRepository(
): Future[Option[(CanonicalProcess, ProcessVersion)]] =
fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance)

def fetchInputConfigDuringExecutionJson(processName: ProcessName, versionId: VersionId): Action[Option[String]] =
def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
PeriodicProcessesWithJson
.filter(p => p.processName === processName && p.processVersionId === versionId)
.filter(p => p.id === periodicProcessId)
.map(_.inputConfigDuringExecutionJson)
.result
.headOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ trait PeriodicProcessesRepository {
): Action[PeriodicProcessDeployment]

def fetchInputConfigDuringExecutionJson(
processName: ProcessName,
versionId: VersionId
periodicProcessId: PeriodicProcessId,
): Action[Option[String]]

def fetchCanonicalProcessWithVersion(
Expand Down Expand Up @@ -496,9 +495,9 @@ class SlickPeriodicProcessesRepository(
update.map(_ => ())
}

def fetchInputConfigDuringExecutionJson(processName: ProcessName, versionId: VersionId): Action[Option[String]] =
def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
PeriodicProcessesWithInputConfig
.filter(p => p.processName === processName && p.processVersionId === versionId)
.filter(p => p.id === periodicProcessId)
.map(_.inputConfigDuringExecutionJson)
.result
.headOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP
}

override def fetchInputConfigDuringExecutionJson(
processName: ProcessName,
versionId: VersionId
periodicProcessId: PeriodicProcessId,
): Future[Option[String]] =
Future.successful(Some("{}"))

Expand Down
12 changes: 5 additions & 7 deletions utils/test-utils/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,14 @@
<logger name="org.flywaydb.core.internal.sqlscript.DefaultSqlScriptExecutor" level="ERROR"/>
<logger name="org.apache.flink.metrics.MetricGroup" level="ERROR"/>
<logger name="org.apache.fop.apps.FOUserAgent" level="ERROR"/>
<!-- it generates a lot of logs on DEBUG during test containers usage -->
<logger name="com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http" level="INFO"/>
<logger name="org.testcontainers.shaded.com.github.dockerjava.core.exec" level="INFO"/>
<logger name="org.testcontainers.shaded.com.github.dockerjava.core.command" level="INFO"/>
<!-- skuber by default logs all requests to k8s on INFO -->
<logger name="skuber" level="${SKUBER_LOG_LEVEL:-WARN}"/>
<logger name="com.azure" level="INFO"/>
<!-- these loggers are useful to diagnose docker tests -->
<logger name="org.testcontainers" level="DEBUG"/>
<logger name="tc" level="DEBUG"/>
<logger name="org.testcontainers" level="INFO"/>
<!-- The following logger can be used for containers logs since 1.18.0 -->
<logger name="tc" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<logger name="com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http" level="OFF"/>

<logger name="scenario-activity-audit" level="INFO" additivity="false">
<appender-ref ref="STDOUT_FOR_SCENARIO_ACTIVITY_AUDIT"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global
import com.dimafeng.testcontainers.{DockerComposeContainer, ServiceLogConsumer, WaitingForService}
import com.github.dockerjava.api.DockerClient
import com.github.dockerjava.api.model.Container
import com.typesafe.scalalogging.LazyLogging
import org.slf4j.Logger
import org.slf4j.MarkerFactory.getIMarkerFactory
import org.testcontainers.DockerClientFactory
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy
import org.testcontainers.utility.LogUtils
import pl.touk.nussknacker.test.MiscUtils._
import pl.touk.nussknacker.test.WithTestHttpClientCreator
import pl.touk.nussknacker.test.containers.ContainerExt.toContainerExt
Expand All @@ -18,7 +23,9 @@ import ujson.Value

import java.io.{File => JFile}
import java.time.Duration
import scala.util.Try
import java.util
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class DockerBasedInstallationExampleNuEnvironment(
nussknackerImageVersion: String,
Expand Down Expand Up @@ -46,7 +53,15 @@ class DockerBasedInstallationExampleNuEnvironment(
tailChildContainers = false
) {

start()
Try(start()) match {
case Failure(ex) =>
// There is no way currently to automatically capture logs from containers before all services from the docker
// compose started. When one of the services is not healthy, there won't be any logs captured. That's why we do
// the capture manually.
captureAllContainerLogs()
throw ex
case Success(()) =>
}

private val (dockerBasedInstallationExampleClient, closeHandler) =
DockerBasedInstallationExampleClient.create(this).allocated.unsafeRunSync()
Expand All @@ -58,6 +73,31 @@ class DockerBasedInstallationExampleNuEnvironment(
super.stop()
}

private def captureAllContainerLogs() = {
val dockerClient = DockerClientFactory.lazyClient()
getNuDockerComposeContainers(dockerClient).foreach { container =>
val logs = LogUtils.getOutput(dockerClient, container.getId)
slf4jLogger.info(getIMarkerFactory.getMarker(container.getNames.mkString(",")), logs)
}
}

private def getNuDockerComposeContainers(dockerClient: DockerClient) = {
dockerClient
.listContainersCmd()
.exec()
.asInstanceOf[util.ArrayList[Container]]
.asScala
.filter { container =>
// dummy method of how to distinguish if the container is Nu docker-compose related container
container.labels.asScala.get("com.docker.compose.project.working_dir") match {
case Some(value) =>
value.contains("nussknacker")
case None => false
}
}
.toList
}

}

object DockerBasedInstallationExampleNuEnvironment extends LazyLogging {
Expand Down Expand Up @@ -87,7 +127,7 @@ class DockerBasedInstallationExampleClient private (

def deployAndWaitForRunningState(scenarioName: String): Unit = {
bootstrapSetupService.executeBash(
s"""/app/utils/nu/deploy-scenario-and-wait-for-running-state.sh "$scenarioName" """
s"""/app/utils/nu/deploy-scenario-and-wait-for-deployed-state.sh "$scenarioName" """
)
}

Expand Down

0 comments on commit af76389

Please sign in to comment.