diff --git a/CromIAM/src/main/scala/cromiam/webservice/SwaggerUiHttpService.scala b/CromIAM/src/main/scala/cromiam/webservice/SwaggerUiHttpService.scala index 99a5d0f3c74..88c3ef1bee5 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/SwaggerUiHttpService.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/SwaggerUiHttpService.scala @@ -6,9 +6,7 @@ import akka.http.scaladsl.server._ import akka.stream.Materializer import akka.stream.scaladsl.Sink import akka.util.ByteString -import com.typesafe.config.Config import cromiam.server.config.SwaggerOauthConfig -import net.ceedubs.ficus.Ficus._ import scala.concurrent.{ExecutionContext, Future} @@ -127,24 +125,6 @@ trait SwaggerUiHttpService extends Directives { protected def rewriteSwaggerIndex(data: String): String = data } -/** - * Extends the SwaggerUiHttpService to gets UI configuration values from a provided Typesafe Config. - */ -trait SwaggerUiConfigHttpService extends SwaggerUiHttpService { - /** - * @return The swagger UI config. - */ - def swaggerUiConfig: Config - - override def swaggerUiVersion = swaggerUiConfig.getString("uiVersion") - - abstract override def swaggerUiBaseUrl = swaggerUiConfig.as[Option[String]]("baseUrl").getOrElse(super.swaggerUiBaseUrl) - - abstract override def swaggerUiPath = swaggerUiConfig.as[Option[String]]("uiPath").getOrElse(super.swaggerUiPath) - - abstract override def swaggerUiDocsPath = swaggerUiConfig.as[Option[String]]("docsPath").getOrElse(super.swaggerUiDocsPath) -} - /** * An extension of HttpService to serve up a resource containing the swagger api as yaml or json. The resource * directory and path on the classpath must match the path for route. The resource can be any file type supported by the diff --git a/CromIAM/src/test/scala/cromiam/webservice/SwaggerUiHttpServiceSpec.scala b/CromIAM/src/test/scala/cromiam/webservice/SwaggerUiHttpServiceSpec.scala index d17c3d8abcd..e95b2fda630 100644 --- a/CromIAM/src/test/scala/cromiam/webservice/SwaggerUiHttpServiceSpec.scala +++ b/CromIAM/src/test/scala/cromiam/webservice/SwaggerUiHttpServiceSpec.scala @@ -4,7 +4,6 @@ import akka.http.scaladsl.model.headers.Location import akka.http.scaladsl.model.{ContentTypes, StatusCodes, Uri} import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.ScalatestRouteTest -import com.typesafe.config.ConfigFactory import common.assertion.CromwellTimeoutSpec import cromiam.server.config.SwaggerOauthConfig import cromiam.webservice.SwaggerUiHttpServiceSpec._ @@ -114,56 +113,6 @@ class NoRedirectRootSwaggerUiHttpServiceSpec extends SwaggerUiHttpServiceSpec { } } -class DefaultSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService { - override def swaggerUiConfig = ConfigFactory.parseString(s"uiVersion = $TestSwaggerUiVersion") - - behavior of "SwaggerUiConfigHttpService" - - it should "redirect /swagger to the index.html" in { - Get("/swagger") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.TemporaryRedirect) - header("Location") should be(Option(Location(Uri("/swagger/index.html?url=/api-docs")))) - contentType should be(ContentTypes.`text/html(UTF-8)`) - } - } - - it should "return index.html from the swagger-ui jar" in { - Get("/swagger/index.html") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.OK) - responseAs[String].take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble) - contentType should be(ContentTypes.`text/html(UTF-8)`) - } - } -} - -class OverriddenSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService { - override def swaggerUiConfig = ConfigFactory.parseString( - s""" - |baseUrl = /base - |docsPath = swagger/cromiam.yaml - |uiPath = ui/path - |uiVersion = $TestSwaggerUiVersion - """.stripMargin) - - behavior of "SwaggerUiConfigHttpService" - - it should "redirect /ui/path to the index.html under /base" in { - Get("/ui/path") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.TemporaryRedirect) - header("Location") should be(Option(Location(Uri("/base/ui/path/index.html?url=/base/swagger/cromiam.yaml")))) - contentType should be(ContentTypes.`text/html(UTF-8)`) - } - } - - it should "return index.html from the swagger-ui jar" in { - Get("/ui/path/index.html") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.OK) - responseAs[String].take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble) - contentType should be(ContentTypes.`text/html(UTF-8)`) - } - } -} - class YamlSwaggerResourceHttpServiceSpec extends SwaggerResourceHttpServiceSpec { override def swaggerServiceName = "testservice" diff --git a/centaur/src/main/resources/standardTestCases/directory_type_output_papi.test b/centaur/src/main/resources/standardTestCases/directory_type_output_papi.test new file mode 100644 index 00000000000..f66dfd656b3 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/directory_type_output_papi.test @@ -0,0 +1,12 @@ +name: directory_type_output_papi +testFormat: workflowsuccess +tags: ["wdl_biscayne"] +backends: [Papi] + +files { + workflow: wdl_biscayne/directory_type_output/directory_type_output.wdl +} + +metadata { + workflowName: main +} diff --git a/centaur/src/main/resources/standardTestCases/wdl_biscayne/directory_type_output/directory_type_output.wdl b/centaur/src/main/resources/standardTestCases/wdl_biscayne/directory_type_output/directory_type_output.wdl new file mode 100644 index 00000000000..d3668dd7242 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/wdl_biscayne/directory_type_output/directory_type_output.wdl @@ -0,0 +1,54 @@ +version development + +# CROM-6875 repro WDL to exercise Directory outputs. Since the Directory type does not exist in WDL versions 1.0 or +# draft-2, the bug this is checking for cannot and does not exist in those WDL versions. +workflow main { + call main { input: s1 = "x", s2 = "y" } + scatter (f in main.f) { + call checker { input: f = f } + } + output { Array[File] f = main.f } +} + +task main { + input { + String s1 + String s2 + } + + command <<< + set -euo pipefail + mkdir d + touch "d/~{s1}" + touch "d/~{s2}" + echo -e "d/~{s1}\nd/~{s2}" + >>> + + output { + Directory d = "d" + Array[File] f = read_lines(stdout()) + } + + runtime { + docker: "debian:stable-slim" + } +} + +task checker { + # Check files were actually created as expected above + input { + File f + } + + command <<< + set -euo pipefail + [ -f ~{f} ] + >>> + + output { + } + + runtime { + docker: "debian:stable-slim" + } +} diff --git a/database/sql/src/main/scala/cromwell/database/slick/WorkflowStoreSlickDatabase.scala b/database/sql/src/main/scala/cromwell/database/slick/WorkflowStoreSlickDatabase.scala index 5d88d0dbec8..1ded96f6066 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/WorkflowStoreSlickDatabase.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/WorkflowStoreSlickDatabase.scala @@ -59,7 +59,8 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase { heartbeatTimestampTo: Timestamp, workflowStateFrom: String, workflowStateTo: String, - workflowStateExcluded: String) + workflowStateExcluded: String, + excludedGroups: Set[String]) (implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] = { def updateForFetched(cromwellId: String, @@ -89,7 +90,7 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase { val action = for { workflowStoreEntries <- dataAccess.fetchStartableWorkflows( - (limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded) + limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded, excludedGroups ).result _ <- DBIO.sequence( workflowStoreEntries map updateForFetched(cromwellId, heartbeatTimestampTo, workflowStateFrom, workflowStateTo) diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreEntryComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreEntryComponent.scala index 3fb6ec48cdd..0b0bf2c510a 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreEntryComponent.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreEntryComponent.scala @@ -75,11 +75,11 @@ trait WorkflowStoreEntryComponent { /** * Returns up to "limit" startable workflows, sorted by submission time. */ - val fetchStartableWorkflows = Compiled( - (limit: ConstColumn[Long], - heartbeatTimestampTimedOut: ConstColumn[Timestamp], - excludeWorkflowState: Rep[String] - ) => { + def fetchStartableWorkflows(limit: Long, + heartbeatTimestampTimedOut: Timestamp, + excludeWorkflowState: String, + excludedGroups: Set[String] + ): Query[WorkflowStoreEntries, WorkflowStoreEntry, Seq] = { val query = for { row <- workflowStoreEntries /* @@ -93,11 +93,11 @@ trait WorkflowStoreEntryComponent { transaction that we know will impact those readers. */ if (row.heartbeatTimestamp.isEmpty || row.heartbeatTimestamp < heartbeatTimestampTimedOut) && - (row.workflowState =!= excludeWorkflowState) + (row.workflowState =!= excludeWorkflowState) && + !(row.hogGroup inSet excludedGroups) } yield row query.forUpdate.sortBy(_.submissionTime.asc).take(limit) } - ) /** * Useful for counting workflows in a given state. diff --git a/database/sql/src/main/scala/cromwell/database/sql/WorkflowStoreSqlDatabase.scala b/database/sql/src/main/scala/cromwell/database/sql/WorkflowStoreSqlDatabase.scala index a8312f6d685..82dc51c875b 100644 --- a/database/sql/src/main/scala/cromwell/database/sql/WorkflowStoreSqlDatabase.scala +++ b/database/sql/src/main/scala/cromwell/database/sql/WorkflowStoreSqlDatabase.scala @@ -74,7 +74,8 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______ heartbeatTimestampTo: Timestamp, workflowStateFrom: String, workflowStateTo: String, - workflowStateExcluded: String) + workflowStateExcluded: String, + excludedGroups: Set[String]) (implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] def writeWorkflowHeartbeats(workflowExecutionUuids: Seq[String], diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala index b2bd7c53f1c..d4b552f15ef 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala @@ -173,7 +173,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams) Determine the number of available workflow slots and request the smaller of that number and maxWorkflowsToLaunch. */ val maxNewWorkflows = maxWorkflowsToLaunch min (maxWorkflowsRunning - stateData.workflows.size - stateData.subWorkflows.size) - params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows) + params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows, excludedGroups = Set.empty) stay() case Event(WorkflowStoreEngineActor.NoNewWorkflowsToStart, _) => log.debug("WorkflowStore provided no new workflows to start") diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/InMemoryWorkflowStore.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/InMemoryWorkflowStore.scala index 121371b162c..b6913f6172c 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/InMemoryWorkflowStore.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/InMemoryWorkflowStore.scala @@ -1,7 +1,6 @@ package cromwell.engine.workflow.workflowstore import java.time.OffsetDateTime - import cats.data.NonEmptyList import cromwell.core.{HogGroup, WorkflowId, WorkflowSourceFilesCollection} import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreAbortResponse.WorkflowStoreAbortResponse @@ -32,7 +31,10 @@ class InMemoryWorkflowStore extends WorkflowStore { * Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp * flag to true */ - override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { + override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { + if (excludedGroups.nonEmpty) + throw new UnsupportedOperationException("Programmer Error: group filtering not supported for single-tenant/in-memory workflow store") + val startableWorkflows = workflowStore filter { _._2 == WorkflowStoreState.Submitted } take n val updatedWorkflows = startableWorkflows map { _._1 -> WorkflowStoreState.Running } workflowStore = workflowStore ++ updatedWorkflows diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStore.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStore.scala index 3513bd6b423..389a18a009c 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStore.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStore.scala @@ -96,7 +96,7 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa * Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp * flag to true */ - override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { + override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { import cats.syntax.traverse._ import common.validation.Validation._ sqlDatabase.fetchWorkflowsInState( @@ -106,7 +106,8 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa OffsetDateTime.now.toSystemTimestamp, WorkflowStoreState.Submitted.toString, WorkflowStoreState.Running.toString, - WorkflowStoreState.OnHold.toString + WorkflowStoreState.OnHold.toString, + excludedGroups: Set[String] ) map { // .get on purpose here to fail the future if something went wrong _.toList.traverse(fromWorkflowStoreEntry).toTry.get diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStore.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStore.scala index abc6cc71f75..375a24db8b3 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStore.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStore.scala @@ -38,7 +38,7 @@ trait WorkflowStore { * Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp * flag to true */ - def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] + def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] def writeWorkflowHeartbeats(workflowIds: Set[(WorkflowId, OffsetDateTime)], heartbeatDateTime: OffsetDateTime) diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreAccess.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreAccess.scala index e44fef47e78..2086cd34256 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreAccess.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreAccess.scala @@ -27,7 +27,7 @@ sealed trait WorkflowStoreAccess { heartbeatDateTime: OffsetDateTime) (implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] - def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] def abort(workflowId: WorkflowId) @@ -50,9 +50,9 @@ case class UncoordinatedWorkflowStoreAccess(store: WorkflowStore) extends Workfl store.writeWorkflowHeartbeats(workflowIds.toVector.toSet, heartbeatDateTime) } - override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = { - store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl) + store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups) } override def deleteFromStore(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = { @@ -79,9 +79,9 @@ case class CoordinatedWorkflowStoreAccess(coordinatedWorkflowStoreAccessActor: A ) } - override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = { - val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl) + val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups) withRetryForTransactionRollback( () => coordinatedWorkflowStoreAccessActor.ask(message).mapTo[List[WorkflowToStart]] ) diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala index 9310cb36ad0..fe5a4d991da 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala @@ -61,7 +61,7 @@ final case class WorkflowStoreActor private( object WorkflowStoreActor { sealed trait WorkflowStoreActorEngineCommand - final case class FetchRunnableWorkflows(n: Int) extends WorkflowStoreActorEngineCommand + final case class FetchRunnableWorkflows(n: Int, excludedGroups: Set[String]) extends WorkflowStoreActorEngineCommand final case class AbortWorkflowCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand final case class WorkflowOnHoldToSubmittedCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand case object InitializerCommand extends WorkflowStoreActorEngineCommand diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActor.scala index c814cd09f5f..3d36a5a8062 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActor.scala @@ -33,8 +33,8 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends override def receive: Receive = { case WriteHeartbeats(ids, heartbeatDateTime) => workflowStore.writeWorkflowHeartbeats(ids.toVector.toSet, heartbeatDateTime) |> run - case FetchStartableWorkflows(count, cromwellId, heartbeatTtl) => - workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl) |> run + case FetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) => + workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) |> run case DeleteFromStore(workflowId) => workflowStore.deleteFromStore(workflowId) |> run case Abort(workflowId) => @@ -45,7 +45,7 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends object WorkflowStoreCoordinatedAccessActor { final case class WriteHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)], heartbeatDateTime: OffsetDateTime) - final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) final case class DeleteFromStore(workflowId: WorkflowId) final case class Abort(workflowId: WorkflowId) diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala index ee699b78022..5f066feeb76 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala @@ -100,8 +100,8 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, private def startNewWork(command: WorkflowStoreActorEngineCommand, sndr: ActorRef, nextData: WorkflowStoreActorData) = { val work: Future[Any] = command match { - case FetchRunnableWorkflows(count) => - newWorkflowMessage(count) map { response => + case FetchRunnableWorkflows(count, excludedGroups) => + newWorkflowMessage(count, excludedGroups) map { response => response match { case NewWorkflowsToStart(workflows) => val workflowsIds = workflows.map(_.id).toList @@ -185,10 +185,10 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, /** * Fetches at most n workflows, and builds the correct response message based on if there were any workflows or not */ - private def newWorkflowMessage(maxWorkflows: Int): Future[WorkflowStoreEngineActorResponse] = { + private def newWorkflowMessage(maxWorkflows: Int, excludedGroups: Set[String]): Future[WorkflowStoreEngineActorResponse] = { def fetchStartableWorkflowsIfNeeded = { if (maxWorkflows > 0) { - workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl) + workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl, excludedGroups) } else { Future.successful(List.empty[WorkflowToStart]) } diff --git a/engine/src/main/scala/cromwell/webservice/SwaggerUiHttpService.scala b/engine/src/main/scala/cromwell/webservice/SwaggerUiHttpService.scala index 6c9bf54d7bf..f366808e7be 100644 --- a/engine/src/main/scala/cromwell/webservice/SwaggerUiHttpService.scala +++ b/engine/src/main/scala/cromwell/webservice/SwaggerUiHttpService.scala @@ -1,10 +1,8 @@ package cromwell.webservice import akka.http.scaladsl.model.{HttpResponse, StatusCodes} -import akka.http.scaladsl.server.Route -import com.typesafe.config.Config -import net.ceedubs.ficus.Ficus._ import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route import akka.stream.scaladsl.Flow import akka.util.ByteString @@ -72,24 +70,6 @@ trait SwaggerUiHttpService { } -/** - * Extends the SwaggerUiHttpService to gets UI configuration values from a provided Typesafe Config. - */ -trait SwaggerUiConfigHttpService extends SwaggerUiHttpService { - /** - * @return The swagger UI config. - */ - def swaggerUiConfig: Config - - override def swaggerUiVersion = swaggerUiConfig.getString("uiVersion") - - abstract override def swaggerUiBaseUrl = swaggerUiConfig.as[Option[String]]("baseUrl").getOrElse(super.swaggerUiBaseUrl) - - abstract override def swaggerUiPath = swaggerUiConfig.as[Option[String]]("uiPath").getOrElse(super.swaggerUiPath) - - abstract override def swaggerUiDocsPath = swaggerUiConfig.as[Option[String]]("docsPath").getOrElse(super.swaggerUiDocsPath) -} - /** * An extension of HttpService to serve up a resource containing the swagger api as yaml or json. The resource * directory and path on the classpath must match the path for route. The resource can be any file type supported by the diff --git a/engine/src/test/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStoreSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStoreSpec.scala index 147f9a347cf..af49ef5149f 100644 --- a/engine/src/test/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStoreSpec.scala +++ b/engine/src/test/scala/cromwell/engine/workflow/workflowstore/SqlWorkflowStoreSpec.scala @@ -16,6 +16,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.enablers.Emptiness._ import org.scalatest.time.{Millis, Seconds, Span} import org.specs2.mock.Mockito +import spray.json.{JsObject, JsString} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ @@ -23,19 +24,91 @@ import scala.concurrent.duration._ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers with ScalaFutures with BeforeAndAfterAll with Mockito { implicit val ec = ExecutionContext.global implicit val defaultPatience = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(100, Millis))) - val sourceFilesCollection = NonEmptyList.of(WorkflowSourceFilesCollection( - Option("sample"), - None, - None, - None, - None, - "input", - WorkflowOptions.empty, - "string", - None, - workflowOnHold = true, - Seq.empty, - requestedWorkflowId = None)) + + val onHoldSourceFilesCollection = NonEmptyList.of( + WorkflowSourceFilesCollection( + Option("sample"), + None, + None, + None, + None, + "input", + WorkflowOptions.empty, + "string", + None, + workflowOnHold = true, + Seq.empty, + None + ) + ) + + val excludedGroupSourceFilesCollection = NonEmptyList.of( + WorkflowSourceFilesCollection( + Option("sample"), + None, + None, + None, + None, + "input", + WorkflowOptions(JsObject(Map("hogGroup" -> JsString("Zardoz")))), + "string", + None, + workflowOnHold = false, + Seq.empty, + None + ) + ) + + val includedGroupSourceFilesCollection1 = NonEmptyList.of( + WorkflowSourceFilesCollection( + Option("sample"), + None, + None, + None, + None, + "input", + WorkflowOptions(JsObject(Map("hogGroup" -> JsString("Goldfinger")))), + "string", + None, + workflowOnHold = false, + Seq.empty, + None + ) + ) + + val includedGroupSourceFilesCollection2 = NonEmptyList.of( + WorkflowSourceFilesCollection( + Option("sample"), + None, + None, + None, + None, + "input", + WorkflowOptions(JsObject(Map("hogGroup" -> JsString("Highlander")))), + "string", + None, + workflowOnHold = false, + Seq.empty, + None + ) + ) + + val includedGroupSourceFilesCollection3 = NonEmptyList.of( + WorkflowSourceFilesCollection( + Option("sample"), + None, + None, + None, + None, + "input", + WorkflowOptions(JsObject(Map("hogGroup" -> JsString("Finding Forrester")))), + "string", + None, + workflowOnHold = false, + Seq.empty, + None + ) + ) DatabaseSystem.All foreach { databaseSystem => @@ -49,16 +122,18 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat lazy val workflowStore = SqlWorkflowStore(dataAccess, metadataDataAccess) it should "start container if required" taggedAs DbmsTest in { - containerOpt.foreach { _.start } + containerOpt.foreach { + _.start + } } it should "honor the onHold flag" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) _ <- workflowStore.switchOnHoldToSubmitted(submissionResponses.head.id) - startableWorkflows2 <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second) + startableWorkflows2 <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second, Set.empty) _ = startableWorkflows2.map(_.id).intersect(submissionResponses.map(_.id).toList).size should be(1) _ <- workflowStore.deleteFromStore(startableWorkflows2.head.id) // Tidy up } yield ()).futureValue @@ -66,8 +141,8 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "abort an onHold workflow" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A01", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A01", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) abortWorkflowId = submissionResponses.head.id workflowStoreAbortResponse <- workflowStore.abort(abortWorkflowId) @@ -78,8 +153,8 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "abort an onHold then submitted workflow without a heartbeat" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A02", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A02", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) abortWorkflowId = submissionResponses.head.id _ <- workflowStore.switchOnHoldToSubmitted(abortWorkflowId) @@ -91,8 +166,8 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "abort an onHold then submitted workflow with a heartbeat" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A03", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A03", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) abortWorkflowId = submissionResponses.head.id _ <- workflowStore.switchOnHoldToSubmitted(abortWorkflowId) @@ -105,8 +180,8 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "abort an onHold then running workflow without a heartbeat" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A04", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A04", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) abortWorkflowId = submissionResponses.head.id _ <- workflowStore.switchOnHoldToSubmitted(abortWorkflowId) @@ -124,8 +199,8 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "abort an onHold then running workflow with a heartbeat" taggedAs DbmsTest in { (for { - submissionResponses <- workflowStore.add(sourceFilesCollection) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A05", 1.second) + submissionResponses <- workflowStore.add(onHoldSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A05", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) abortWorkflowId = submissionResponses.head.id _ <- workflowStore.switchOnHoldToSubmitted(abortWorkflowId) @@ -151,17 +226,54 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat } yield ()).futureValue } + it should "find a grouped workflow normally when not excluding" taggedAs DbmsTest in { + (for { + submissionResponses <- workflowStore.add(excludedGroupSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A06", 1.second, excludedGroups = Set.empty) + _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList).size should be(1) + _ <- workflowStore.deleteFromStore(startableWorkflows.head.id) // Tidy up + } yield ()).futureValue + } + + it should "honor the excludedGroups parameter for a target group" taggedAs DbmsTest in { + (for { + submissionResponses <- workflowStore.add(excludedGroupSourceFilesCollection) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A07", 1.second, excludedGroups = Set("Zardoz")) + _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList) should be(empty) + _ <- workflowStore.deleteFromStore(submissionResponses.head.id) // Tidy up + } yield ()).futureValue + } + + it should "select appropriately with the excludedGroups parameter" taggedAs DbmsTest in { + (for { + submissionResponsesExcluded <- workflowStore.add(excludedGroupSourceFilesCollection) + submissionResponsesIncluded1 <- workflowStore.add(includedGroupSourceFilesCollection1) + submissionResponsesIncluded2 <- workflowStore.add(includedGroupSourceFilesCollection2) + submissionResponsesIncluded3 <- workflowStore.add(includedGroupSourceFilesCollection3) + startableWorkflows <- workflowStore.fetchStartableWorkflows(3, "A08", 1.second, excludedGroups = Set("Zardoz")) + _ = startableWorkflows.map(_.id).intersect(submissionResponsesExcluded.map(_.id).toList).size should be(0) + _ = startableWorkflows.map(_.id).intersect(submissionResponsesIncluded1.map(_.id).toList).size should be(1) + _ = startableWorkflows.map(_.id).intersect(submissionResponsesIncluded2.map(_.id).toList).size should be(1) + _ = startableWorkflows.map(_.id).intersect(submissionResponsesIncluded3.map(_.id).toList).size should be(1) + _ = startableWorkflows.map(_.id).size should be(3) + _ <- workflowStore.deleteFromStore(submissionResponsesExcluded.head.id) // Tidy up + _ <- workflowStore.deleteFromStore(submissionResponsesIncluded1.head.id) // Tidy up + _ <- workflowStore.deleteFromStore(submissionResponsesIncluded2.head.id) // Tidy up + _ <- workflowStore.deleteFromStore(submissionResponsesIncluded3.head.id) // Tidy up + } yield ()).futureValue + } + it should "accept and honor a requested workflow ID" taggedAs DbmsTest in { val requestedId = WorkflowId.randomId() - val sourcesToSubmit = sourceFilesCollection.map(c => c.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( + val sourcesToSubmit = onHoldSourceFilesCollection.map(c => c.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( requestedWorkflowId = Option(requestedId), workflowOnHold = false )) (for { submissionResponses <- workflowStore.add(sourcesToSubmit) - startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second) + startableWorkflows <- workflowStore.fetchStartableWorkflows(10, "A00", 1.second, Set.empty) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList).size should be(1) _ = startableWorkflows.map(_.id).intersect(submissionResponses.map(_.id).toList).head should be(requestedId) _ <- workflowStore.deleteFromStore(requestedId) // tidy up @@ -171,7 +283,7 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat it should "not accept a duplicate workflow ID" taggedAs DbmsTest in { val requestedId = WorkflowId.randomId() - val workflowSourceFilesTemplate = sourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( + val workflowSourceFilesTemplate = onHoldSourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( requestedWorkflowId = Option(requestedId) ) @@ -199,7 +311,7 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat val requestedId2 = WorkflowId.randomId() val requestedId3 = WorkflowId.randomId() - val workflowSourceFilesTemplate = sourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( + val workflowSourceFilesTemplate = onHoldSourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports].copy( requestedWorkflowId = Option(requestedId1) ) @@ -233,7 +345,7 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat val requestedId2 = WorkflowId.randomId() val requestedId3 = WorkflowId.randomId() - val workflowSourceFilesTemplate = sourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports] + val workflowSourceFilesTemplate = onHoldSourceFilesCollection.head.asInstanceOf[WorkflowSourceFilesWithoutImports] val sourcesToSubmit = NonEmptyList.of( workflowSourceFilesTemplate.copy(requestedWorkflowId = Option(requestedId1), workflowOnHold = false), @@ -258,7 +370,9 @@ class SqlWorkflowStoreSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat } it should "stop container if required" taggedAs DbmsTest in { - containerOpt.foreach { _.stop } + containerOpt.foreach { + _.stop + } } } } diff --git a/engine/src/test/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActorSpec.scala index 6a12b978b53..b68a7f9e64b 100644 --- a/engine/src/test/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreCoordinatedAccessActorSpec.scala @@ -66,13 +66,13 @@ class WorkflowStoreCoordinatedAccessActorSpec extends TestKitSuite val now = OffsetDateTime.now() val expected: List[WorkflowToStart] = List(WorkflowToStart(WorkflowId.randomId(), now, collection, Submitted, HogGroup("foo"))) val workflowStore = new InMemoryWorkflowStore { - override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { Future.successful(expected) } } val actor = TestActorRef(new WorkflowStoreCoordinatedAccessActor(workflowStore)) - val request = FetchStartableWorkflows(1, "test fetchStartableWorkflows success", 1.second) + val request = FetchStartableWorkflows(1, "test fetchStartableWorkflows success", 1.second, Set.empty) implicit val timeout: Timeout = Timeout(2.seconds.dilated) actor.ask(request).mapTo[List[WorkflowToStart]] map { actual => actual should be(expected) @@ -97,13 +97,13 @@ class WorkflowStoreCoordinatedAccessActorSpec extends TestKitSuite val now = OffsetDateTime.now() val expected: List[WorkflowToStart] = List(WorkflowToStart(WorkflowId.randomId(), now, collection, Submitted, HogGroup("foo"))) val workflowStore = new InMemoryWorkflowStore { - override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = { Future.successful(expected) } } val actor = TestActorRef(new WorkflowStoreCoordinatedAccessActor(workflowStore)) - val request = FetchStartableWorkflows(1, "test fetchStartableWorkflows with workflow url success", 1.second) + val request = FetchStartableWorkflows(1, "test fetchStartableWorkflows with workflow url success", 1.second, Set.empty) implicit val timeout: Timeout = Timeout(2.seconds.dilated) actor.ask(request).mapTo[List[WorkflowToStart]] map { actual => actual should be(expected) @@ -166,14 +166,14 @@ class WorkflowStoreCoordinatedAccessActorSpec extends TestKitSuite it should s"fail to fetchStartableWorkflows due to $description" in { val workflowStore = new InMemoryWorkflowStore { - override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration) + override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String]) (implicit ec: ExecutionContext): Future[Nothing] = { result() } } val actor = TestActorRef(new WorkflowStoreCoordinatedAccessActor(workflowStore)) val heartbeatTtlNotReallyUsed = 1.second - val request = FetchStartableWorkflows(1, s"test $description fetchStartableWorkflows", heartbeatTtlNotReallyUsed) + val request = FetchStartableWorkflows(1, s"test $description fetchStartableWorkflows", heartbeatTtlNotReallyUsed, Set.empty) implicit val timeout: Timeout = Timeout(2.seconds.dilated) actor.ask(request).failed map { actual => actual.getMessage should startWith(expectedMessagePrefix) diff --git a/engine/src/test/scala/cromwell/webservice/SwaggerUiHttpServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/SwaggerUiHttpServiceSpec.scala index dcea4c0d2bc..5e53b5677e4 100644 --- a/engine/src/test/scala/cromwell/webservice/SwaggerUiHttpServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/SwaggerUiHttpServiceSpec.scala @@ -4,13 +4,12 @@ import akka.http.scaladsl.model.headers.Location import akka.http.scaladsl.model.{StatusCodes, Uri} import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import com.typesafe.config.ConfigFactory import common.assertion.CromwellTimeoutSpec -import cromwell.webservice.SwaggerUiHttpServiceSpec._ import cromwell.webservice.routes.CromwellApiService import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks + import scala.concurrent.duration._ trait SwaggerUiHttpServiceSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers with ScalatestRouteTest with SwaggerUiHttpService { @@ -121,51 +120,6 @@ class NoRedirectRootSwaggerUiHttpServiceSpec extends SwaggerUiHttpServiceSpec { } } -class DefaultSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService { - override def swaggerUiConfig = ConfigFactory.parseString(s"uiVersion = ${CromwellApiService.swaggerUiVersion}") - - behavior of "SwaggerUiConfigHttpService" - - it should "redirect /swagger to the index.html" in { - Get("/swagger") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.TemporaryRedirect) - header("Location") should be(Option(Location(Uri("/swagger/index.html?url=/api-docs")))) - } - } - - it should "return index.html from the swagger-ui jar" in { - Get("/swagger/index.html") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.OK) - responseAs[String].splitAt(51)._2.take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble) - } - } -} - -class OverriddenSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService { - override def swaggerUiConfig = ConfigFactory.parseString( - s"""|baseUrl = /base - |docsPath = swagger/common.yaml - |uiPath = ui/path - |uiVersion = ${CromwellApiService.swaggerUiVersion} - |""".stripMargin) - - behavior of "SwaggerUiConfigHttpService" - - it should "redirect /ui/path to the index.html under /base" in { - Get("/ui/path") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.TemporaryRedirect) - header("Location") should be(Option(Location(Uri("/base/ui/path/index.html?url=/base/swagger/common.yaml")))) - } - } - - it should "return index.html from the swagger-ui jar" in { - Get("/ui/path/index.html") ~> swaggerUiRoute ~> check { - status should be(StatusCodes.OK) - responseAs[String].splitAt(51)._2.take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble) - } - } -} - class YamlSwaggerResourceHttpServiceSpec extends SwaggerResourceHttpServiceSpec { override def swaggerServiceName = "testservice" diff --git a/project/Version.scala b/project/Version.scala index 14dd94c59b2..8a71abbd83d 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -5,7 +5,7 @@ import sbt._ object Version { // Upcoming release, or current if we're on a master / hotfix branch - val cromwellVersion = "75" + val cromwellVersion = "76" /** * Returns true if this project should be considered a snapshot. diff --git a/scripts/backpressure_report/README.md b/scripts/backpressure_report/README.md new file mode 100644 index 00000000000..d4f465fa94f --- /dev/null +++ b/scripts/backpressure_report/README.md @@ -0,0 +1,68 @@ +# Backpressure Report + +This `backpressure_report` Python project allows for measuring the amount of time Cromwell runner instances spend in a +high I/O state which triggers internal backpressure. While in this high I/O state the Cromwell runners will not hand out +job execution or restart check tokens, so job starts and restarts will be slowed until I/O returns to normal levels. + +## Running + +Installation: + +Probably best done inside a [virtual environment](https://docs.python.org/3/library/venv.html) + +```shell +pip install . +``` + +Usage: + +```shell +python -m backpressure_report.main +``` + +The program parses JSON-formatted Google Logs Explorer logs JSON looking for backpressure messages. +The Logs Explorer query should look like the following: + +``` + resource.labels.container_name="cromwell1-runner-app" + (jsonPayload.message=~"IoActor backpressure off" OR jsonPayload.message=~"Beginning IoActor backpressure") +``` + +Multiple input files may be required to capture logging output from an entire interval of interest since Google imposes +limits on the number of log entries that can be exported from a single query. + +Output is a CSV file like: + +``` +Interval (1 hour),All pods,Pod 47z68,Pod 4hgd4,Pod 7svrs,Pod 9l2ld,Pod 9p9j4,Pod bj4vh,Pod d85vc,Pod gdp8x,Pod gth4r,Pod jkpbj,Pod jrgsx,Pod ltmvs,Pod mkdjt,Pod qt2bq,Pod th2p8,Pod thwz9,Pod xvcrk,Pod z7jfk +2022-01-01 05:00:00+00:00,62,20,0,0,0,42,0,0,0,0,0,0,0,0,0,0,0,0,0 +2022-01-01 06:00:00+00:00,40,0,0,0,0,0,0,0,40,0,0,0,0,0,0,0,0,0,0 +2022-01-01 07:00:00+00:00,20,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +2022-01-01 08:00:00+00:00,40,20,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0 +2022-01-01 09:00:00+00:00,110,0,0,0,0,70,0,0,40,0,0,0,0,0,0,0,0,0,0 +... +``` + +The first column is the timestamp for the interval start, the second column is the sum of all backpressure durations from all runner +pods during that interval in seconds, and all subsequent columns are the backpressure durations for individual pods during the interval in seconds. + +### Questions + +- Q: Why not run the scripts directly, eg `python main.py`? + - A: Running Python from this outer directory allows it to discover the `backpressure_report` + project, and thus allows imports across and between scripts. + +## Unit tests + +To run the Python unit tests from the top-level `backpressure_report` directory +(ie the one containing this README.MD file), run: +```sh +python -m unittest discover -v +``` + +This will: + - Find the `backpressure_report` project in that subdirectory. + - And make it importable to other scripts. + - Run the Python built-in unittest script, which will: + - Discover the tests project in the `test` directory + - Run them, verbosely. \ No newline at end of file diff --git a/scripts/backpressure_report/backpressure_report/lib/__init__.py b/scripts/backpressure_report/backpressure_report/lib/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/scripts/backpressure_report/backpressure_report/lib/backpressure_event.py b/scripts/backpressure_report/backpressure_report/lib/backpressure_event.py new file mode 100644 index 00000000000..777bff8489e --- /dev/null +++ b/scripts/backpressure_report/backpressure_report/lib/backpressure_event.py @@ -0,0 +1,76 @@ +from backpressure_report.lib import log_utils +from datetime import datetime +from dateutil import parser +import itertools +from typing import AnyStr + + +class BackpressureEvent: + """ + Represents a span during which a single pod was backpressuring / in high I/O and not dispensing job tokens. + """ + def __init__(self, pod: str, start: datetime, end: datetime): + self.pod = pod + self.start = start + self.end = end + + def duration(self): + return (self.end - self.start).seconds + + def __str__(self) -> AnyStr: + return f"BackpressureEvent(pod = {self.pod},start={str(self.start)},duration={(self.duration())}s)" + + +def build_backpressure_events_from_log_jsons(logs: list): + """ + Build a list of BackpressureEvents from the specified logs, using matched "start" and "end" events for a particular + pod to delimit the duration of the BackpressureEvent. + + :param logs: a list of JSON log files, each of which is a list of JSON objects each representing a log entry. + :return: a list of BackpressureEvents. + """ + + # Complete BackpressureEvent objects corresponding to a matched pair of backpressure start and stop log entries for + # a pod. + complete = [] + # Already-processed log entry ids to ignore duplicates in overlapping log file ranges. + seen_insert_ids = set(()) + # pod names for which we have seen a "backpressure start" log messages and for which we are now awaiting a matching + # "backpressure stop" log message for the same pod name. + in_progress_starts_by_pod = {} + + # Merge the logs so the sorting covers all log entries. + merged_logs = itertools.chain(*logs) + for entry in log_utils.filter_and_sort_log_entries(merged_logs): + insert_id = entry['insertId'] + # skip duplicates + if insert_id in seen_insert_ids: + continue + + seen_insert_ids.add(insert_id) + # Most of the pod name is the same across all pods, only the bit after the last '-' is unique. + pod = entry['resource']['labels']['pod_name'].split('-')[-1] + + if log_utils.is_end_event(entry): + if pod in in_progress_starts_by_pod: + # Make a backpressure event object + start = parser.isoparse(in_progress_starts_by_pod[pod]) + end = parser.isoparse(entry['timestamp']) + event = BackpressureEvent(pod=pod, start=start, end=end) + + # Add this object to complete + complete.append(event) + + # Remove the wip object from in_progress_starts_by_pod + del in_progress_starts_by_pod[pod] + + elif log_utils.is_start_event(entry): + # There are actually two timestamps in the JSON log entries which appear to represent different concepts: + # time emitted ('jsonPayload.localTimestamp') versus time added to the log ('timestamp'). Time emitted would + # seem to be preferable but that value is not specified with a timezone and is ambiguously interpreted by + # the parsing code as being EST when it's actually UTC. This can make reading the report a bit confusing or + # misleading. In practice the timestamps only seem to differ by small amounts, so no big deal to use + # 'timestamp' with its explicit UTC timezone. + in_progress_starts_by_pod[pod] = entry['timestamp'] + + return complete diff --git a/scripts/backpressure_report/backpressure_report/lib/backpressure_window.py b/scripts/backpressure_report/backpressure_report/lib/backpressure_window.py new file mode 100644 index 00000000000..1d4f5e0402e --- /dev/null +++ b/scripts/backpressure_report/backpressure_report/lib/backpressure_window.py @@ -0,0 +1,69 @@ +from collections import defaultdict +from datetime import timedelta +import itertools +from typing import AnyStr + + +class BackpressureWindow: + def __init__(self, timestamp): + self.timestamp = timestamp + self.pod_events = defaultdict(list) + + def add_event(self, event): + self.pod_events[event.pod].append(event) + + def durations_by_pod(self) -> dict: + # Return 0 by default for pods that aren't in this window at all. + ret = defaultdict(lambda: 0) + for pod, events in self.pod_events.items(): + ret[pod] = sum([e.duration() for e in events]) + return ret + + def report_line(self, all_pods) -> AnyStr: + durations = self.durations_by_pod() + sum_durations = sum(durations.values()) + cells = itertools.chain([str(self.timestamp), str(sum_durations)], [str(durations[pod]) for pod in all_pods]) + return ','.join(cells) + + +def build_windows_and_pods_from_events(backpressure_events, window_width_in_hours=1) -> (list, list): + """ + Generate barchart-friendly time windows with counts of backpressuring durations within each window. + + :param backpressure_events: a list of BackpressureEvents to be broken up into time windows + :param window_width_in_hours: how wide each time window should be in hours + :return: a dictionary with timestamp keys to list of BackpressureEvent values + """ + + # The logic below is highly dependent on events being sorted by start timestamp oldest to newest. + sorted_events = backpressure_events.copy() + sorted_events.sort(key=lambda e: e.start) + + interval = sorted_events[0].start.replace(minute=0, second=0, microsecond=0) + next_interval = interval + timedelta(hours=window_width_in_hours) + + all_pods = set(()) + windows = [BackpressureWindow(interval)] + + for event in sorted_events: + all_pods.add(event.pod) + while event.start >= next_interval: + interval = next_interval + windows.append(BackpressureWindow(interval)) + next_interval = next_interval + timedelta(hours=window_width_in_hours) + windows[-1].add_event(event) + all_pods_list = list(all_pods) + all_pods_list.sort() + return windows, all_pods_list + + +def print_windows(windows, all_pods, window_width_in_hours) -> None: + """ + CSV format output generation for the specified backpressure windows. + """ + header_cells = itertools.chain([f"Interval ({window_width_in_hours} hour)", "All pods"], + [f"Pod {pod}" for pod in all_pods]) + print(",".join(header_cells)) + + for window in windows: + print(window.report_line(all_pods)) diff --git a/scripts/backpressure_report/backpressure_report/lib/log_utils.py b/scripts/backpressure_report/backpressure_report/lib/log_utils.py new file mode 100644 index 00000000000..88e4a0eb9ec --- /dev/null +++ b/scripts/backpressure_report/backpressure_report/lib/log_utils.py @@ -0,0 +1,29 @@ +import json + + +def build_log_jsons_from_input_files(input_files: list) -> list: + def load(path): + # `with` to auto-close files + with open(path, 'r') as f: + return json.load(f) + + return [load(f) for f in input_files] + + +def is_start_event(entry) -> bool: + return entry['jsonPayload']['message'].startswith('Beginning IoActor backpressure') + + +def is_end_event(entry) -> bool: + return entry['jsonPayload']['message'] == 'IoActor backpressure off' + + +def filter_and_sort_log_entries(log) -> list: + # Only start or end events are interesting + filtered = [ + entry for entry in log if (is_start_event(entry) or is_end_event(entry)) + ] + + # Oldest first + filtered.sort(key=(lambda e: e['timestamp'])) + return filtered diff --git a/scripts/backpressure_report/backpressure_report/main.py b/scripts/backpressure_report/backpressure_report/main.py new file mode 100644 index 00000000000..77abdc272f2 --- /dev/null +++ b/scripts/backpressure_report/backpressure_report/main.py @@ -0,0 +1,10 @@ +from backpressure_report.lib import backpressure_event, backpressure_window, log_utils +import sys + + +if __name__ == '__main__': + log_jsons = log_utils.build_log_jsons_from_input_files(sys.argv[1:]) + events = backpressure_event.build_backpressure_events_from_log_jsons(log_jsons) + window_width_in_hours = 1 + windows, all_pods = backpressure_window.build_windows_and_pods_from_events(events, window_width_in_hours) + backpressure_window.print_windows(windows, all_pods, window_width_in_hours) diff --git a/scripts/backpressure_report/setup.py b/scripts/backpressure_report/setup.py new file mode 100644 index 00000000000..ca97b9a561b --- /dev/null +++ b/scripts/backpressure_report/setup.py @@ -0,0 +1,14 @@ +from setuptools import find_packages, setup + +export_packages = find_packages(exclude=["tests"]) + +setup( + name="backpressure_report", + version="0.1", + author="Batch Tasks", + packages=export_packages, + python_requires=">=3.8", + install_requires=[ + "python-dateutil", + ], +) diff --git a/scripts/backpressure_report/test/__init__.py b/scripts/backpressure_report/test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/scripts/backpressure_report/test/lib/__init__.py b/scripts/backpressure_report/test/lib/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/scripts/backpressure_report/test/lib/log_helper.py b/scripts/backpressure_report/test/lib/log_helper.py new file mode 100644 index 00000000000..75f132e9714 --- /dev/null +++ b/scripts/backpressure_report/test/lib/log_helper.py @@ -0,0 +1,5 @@ +from backpressure_report.lib import log_utils +import pathlib + +__log_path = pathlib.Path(__file__).parent.parent / 'resources' / 'alpha_logs.json' +LOG = log_utils.build_log_jsons_from_input_files([__log_path])[0] diff --git a/scripts/backpressure_report/test/resources/__init__.py b/scripts/backpressure_report/test/resources/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/scripts/backpressure_report/test/resources/alpha_logs.json b/scripts/backpressure_report/test/resources/alpha_logs.json new file mode 100644 index 00000000000..3dc1b0685a9 --- /dev/null +++ b/scripts/backpressure_report/test/resources/alpha_logs.json @@ -0,0 +1,1158 @@ +[ + { + "insertId": "5syrl7s3faqbgnd1", + "jsonPayload": { + "localTimestamp": "2022-01-14 23:17:11,327", + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-643" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jhpvf", + "project_id": "broad-dsde-alpha", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-14T23:17:11.328118670Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-5j7g", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T23:17:13.280918051Z" + }, + { + "insertId": "6hqtu5dl4w9ajk0k", + "jsonPayload": { + "localTimestamp": "2022-01-14 23:16:42,079", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-44", + "message": "Beginning IoActor backpressure for 29.22 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jhpvf", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha", + "location": "us-central1-a", + "namespace_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T23:16:42.079692927Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-5j7g", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T23:16:43.284332343Z" + }, + { + "insertId": "0k4nevmfm69uyv3n", + "jsonPayload": { + "localTimestamp": "2022-01-14 09:28:38,486", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-489", + "message": "IoActor backpressure off" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-14T09:28:38.486509890Z", + "severity": "INFO", + "labels": { + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T09:28:43.431563178Z" + }, + { + "insertId": "q1ybvz8ule7lr1w3", + "jsonPayload": { + "localTimestamp": "2022-01-14 09:27:37,915", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-671", + "message": "Beginning IoActor backpressure for 59.50 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-14T09:27:37.915274643Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T09:27:38.448546928Z" + }, + { + "insertId": "6o0roivwhmyn9xic", + "jsonPayload": { + "localTimestamp": "2022-01-14 06:56:49,754", + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-726" + }, + "resource": { + "type": "k8s_container", + "labels": { + "cluster_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "container_name": "cromwell1-runner-app", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-14T06:56:49.755052202Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:56:50.044767637Z" + }, + { + "insertId": "ssq5lqyz0epefhvq", + "jsonPayload": { + "localTimestamp": "2022-01-14 06:56:28,045", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-60", + "message": "Beginning IoActor backpressure for 21.69 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "container_name": "cromwell1-runner-app", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "namespace_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-14T06:56:28.045929738Z", + "severity": "INFO", + "labels": { + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:56:30.038201413Z" + }, + { + "insertId": "e6qnz5dvylluhzuy", + "jsonPayload": { + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-638", + "localTimestamp": "2022-01-14 06:42:12,530" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "container_name": "cromwell1-runner-app", + "namespace_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-qtn7p", + "location": "us-central1-a", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:42:12.531174853Z", + "severity": "INFO", + "labels": { + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8qdm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:42:12.646518634Z" + }, + { + "insertId": "euccfz5nkjy0g3te", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-640", + "localTimestamp": "2022-01-14 06:41:39,926", + "message": "Beginning IoActor backpressure for 32.58 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "container_name": "cromwell1-runner-app", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-qtn7p", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:41:39.926659814Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8qdm", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:41:42.644333594Z" + }, + { + "insertId": "f53f1suuzmq3st84", + "jsonPayload": { + "localTimestamp": "2022-01-14 06:29:27,196", + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-627" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "container_name": "cromwell1-runner-app", + "pod_name": "cromwell1-runner-775c9f895f-9hlbc", + "namespace_name": "terra-alpha", + "location": "us-central1-a", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:29:27.197107340Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-jnsf", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:29:27.549363799Z" + }, + { + "insertId": "63tcpb6gv3vhws4k", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-439", + "localTimestamp": "2022-01-14 06:28:54,368", + "message": "Beginning IoActor backpressure for 32.80 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "location": "us-central1-a", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9hlbc" + } + }, + "timestamp": "2022-01-14T06:28:54.368850421Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-jnsf" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:28:57.555227959Z" + }, + { + "insertId": "quv8gs3rhyl2e14b", + "jsonPayload": { + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-646", + "localTimestamp": "2022-01-14 06:08:53,517" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-775c9f895f-9hlbc", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:08:53.517615126Z", + "severity": "INFO", + "labels": { + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-jnsf", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:08:57.551013074Z" + }, + { + "insertId": "4bxvbztmmj31sqqa", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-21", + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-14 06:08:47,805" + }, + "resource": { + "type": "k8s_container", + "labels": { + "container_name": "cromwell1-runner-app", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:08:47.805498754Z", + "severity": "INFO", + "labels": { + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/app_kubernetes_io/component": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:08:50.046215519Z" + }, + { + "insertId": "dpo5vck1ejsgqvbk", + "jsonPayload": { + "message": "Beginning IoActor backpressure for 29.98 seconds", + "localTimestamp": "2022-01-14 06:08:23,511", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-24" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9hlbc", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-14T06:08:23.512094438Z", + "severity": "INFO", + "labels": { + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-jnsf", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:08:27.547919533Z" + }, + { + "insertId": "ssmhy0t7rsshd1rr", + "jsonPayload": { + "message": "Beginning IoActor backpressure for 27.91 seconds", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-52", + "localTimestamp": "2022-01-14 06:08:19,875" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha", + "namespace_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-14T06:08:19.875844661Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-14T06:08:20.043104696Z" + }, + { + "insertId": "jm56jxa07lfvwpcx", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-672", + "localTimestamp": "2022-01-13 06:47:20,797", + "message": "IoActor backpressure off" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jhpvf", + "location": "us-central1-a", + "project_id": "broad-dsde-alpha", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-13T06:47:20.797671436Z", + "severity": "INFO", + "labels": { + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-5j7g", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-13T06:47:23.275716933Z" + }, + { + "insertId": "bi8pwo7odduzrxwm", + "jsonPayload": { + "localTimestamp": "2022-01-13 06:46:55,340", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-667", + "message": "Beginning IoActor backpressure for 25.44 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "location": "us-central1-a", + "pod_name": "cromwell1-runner-775c9f895f-jhpvf", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-13T06:46:55.341037081Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-5j7g", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-13T06:46:58.288976005Z" + }, + { + "insertId": "bdwpv7fn8qpa0hdn", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-723", + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-13 06:12:19,784" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "location": "us-central1-a", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-13T06:12:19.785011197Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/pod-template-hash": "775c9f895f", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-13T06:12:20.043389797Z" + }, + { + "insertId": "flg1ljypvzfq0lyl", + "jsonPayload": { + "localTimestamp": "2022-01-13 06:11:58,044", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-733", + "message": "Beginning IoActor backpressure for 21.72 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "pod_name": "cromwell1-runner-775c9f895f-jx9b9", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "namespace_name": "terra-alpha", + "project_id": "broad-dsde-alpha" + } + }, + "timestamp": "2022-01-13T06:11:58.044285482Z", + "severity": "INFO", + "labels": { + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/managed-by": "Helm" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-13T06:12:00.045266839Z" + }, + { + "insertId": "1tz3zroz7aue4jvq", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-574", + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-11 21:16:56,726" + }, + "resource": { + "type": "k8s_container", + "labels": { + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "namespace_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-11T21:16:56.727212494Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T21:16:58.450848427Z" + }, + { + "insertId": "t4wvehhu7frda1lk", + "jsonPayload": { + "message": "Beginning IoActor backpressure for 31.99 seconds", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-492", + "localTimestamp": "2022-01-11 21:16:24,652" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "cluster_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-11T21:16:24.652383265Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T21:16:28.466397760Z" + }, + { + "insertId": "nzaz0g6tzjb0vhno", + "jsonPayload": { + "localTimestamp": "2022-01-11 18:33:34,805", + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-460" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "location": "us-central1-a", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "namespace_name": "terra-alpha" + } + }, + "timestamp": "2022-01-11T18:33:34.805904100Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:33:36.049821033Z" + }, + { + "insertId": "x2jyrdor48jk83yg", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-13", + "message": "Beginning IoActor backpressure for 32.96 seconds", + "localTimestamp": "2022-01-11 18:33:01,824" + }, + "resource": { + "type": "k8s_container", + "labels": { + "cluster_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-11T18:33:01.824427921Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:33:06.051167514Z" + }, + { + "insertId": "tb64rp6qqcbp08n1", + "jsonPayload": { + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-475", + "localTimestamp": "2022-01-11 18:12:37,325" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-11T18:12:37.326152125Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:12:41.045981753Z" + }, + { + "insertId": "wpip1wp8bkitg12i", + "jsonPayload": { + "localTimestamp": "2022-01-11 18:12:32,200", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-449", + "message": "IoActor backpressure off" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha", + "namespace_name": "terra-alpha", + "location": "us-central1-a", + "pod_name": "cromwell1-runner-775c9f895f-qtn7p" + } + }, + "timestamp": "2022-01-11T18:12:32.201304059Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8qdm", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:12:32.662625803Z" + }, + { + "insertId": "9pmgxi1psnygk5s7", + "jsonPayload": { + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-11 18:12:31,406", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-574" + }, + "resource": { + "type": "k8s_container", + "labels": { + "container_name": "cromwell1-runner-app", + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "location": "us-central1-a", + "namespace_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha" + } + }, + "timestamp": "2022-01-11T18:12:31.406835144Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:12:33.442645480Z" + }, + { + "insertId": "c9eefxhj24gxs1ee", + "jsonPayload": { + "message": "Beginning IoActor backpressure for 24.63 seconds", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-634", + "localTimestamp": "2022-01-11 18:12:07,552" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "pod_name": "cromwell1-runner-775c9f895f-qtn7p", + "cluster_name": "terra-alpha", + "project_id": "broad-dsde-alpha" + } + }, + "timestamp": "2022-01-11T18:12:07.552573970Z", + "severity": "INFO", + "labels": { + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8qdm", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:12:07.648514975Z" + }, + { + "insertId": "5xr71n5apqsdc7xv", + "jsonPayload": { + "message": "Beginning IoActor backpressure for 31.05 seconds", + "localTimestamp": "2022-01-11 18:12:06,254", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-470" + }, + "resource": { + "type": "k8s_container", + "labels": { + "project_id": "broad-dsde-alpha", + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "cluster_name": "terra-alpha", + "location": "us-central1-a", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-11T18:12:06.254799065Z", + "severity": "INFO", + "labels": { + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:12:11.048291145Z" + }, + { + "insertId": "ujvm8qx4pm4mtbsh", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-488", + "message": "Beginning IoActor backpressure for 23.63 seconds", + "localTimestamp": "2022-01-11 18:11:46,640" + }, + "resource": { + "type": "k8s_container", + "labels": { + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha", + "pod_name": "cromwell1-runner-775c9f895f-5s2fh", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-11T18:11:46.640221128Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-6hpn", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T18:11:48.431579557Z" + }, + { + "insertId": "0w4sawy9n19nyld1", + "jsonPayload": { + "localTimestamp": "2022-01-11 17:42:35,915", + "message": "IoActor backpressure off", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-648" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "namespace_name": "terra-alpha", + "project_id": "broad-dsde-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-11T17:42:35.915634947Z", + "severity": "INFO", + "labels": { + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/pod-template-hash": "775c9f895f" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T17:42:36.054016355Z" + }, + { + "insertId": "cbmumk56tes95ilj", + "jsonPayload": { + "localTimestamp": "2022-01-11 17:42:09,783", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-13", + "message": "Beginning IoActor backpressure for 26.12 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "location": "us-central1-a", + "pod_name": "cromwell1-runner-775c9f895f-9x9hd", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-11T17:42:09.783329241Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/deployment": "cromwell1-runner", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-1n0h", + "k8s-pod/pod-template-hash": "775c9f895f", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T17:42:11.047348877Z" + }, + { + "insertId": "50u6xj4go9cvjcjo", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-234", + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-11 02:43:40,598" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-5c4f9c49cd-prs67", + "project_id": "broad-dsde-alpha", + "location": "us-central1-a", + "container_name": "cromwell1-runner-app", + "cluster_name": "terra-alpha", + "namespace_name": "terra-alpha" + } + }, + "timestamp": "2022-01-11T02:43:40.598882561Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/pod-template-hash": "5c4f9c49cd", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/name": "cromwell" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T02:43:45.041569324Z" + }, + { + "insertId": "jqzhlphajtqjz554", + "jsonPayload": { + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-233", + "localTimestamp": "2022-01-11 02:43:08,012", + "message": "Beginning IoActor backpressure for 32.56 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "cluster_name": "terra-alpha", + "location": "us-central1-a", + "pod_name": "cromwell1-runner-5c4f9c49cd-prs67", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "project_id": "broad-dsde-alpha" + } + }, + "timestamp": "2022-01-11T02:43:08.013133481Z", + "severity": "INFO", + "labels": { + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/pod-template-hash": "5c4f9c49cd", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-khw2", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-11T02:43:10.045131471Z" + }, + { + "insertId": "h4eyi9ut12tr1zoj", + "jsonPayload": { + "message": "IoActor backpressure off", + "localTimestamp": "2022-01-10 08:01:16,388", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-16" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-5c4f9c49cd-tcfrr", + "namespace_name": "terra-alpha", + "container_name": "cromwell1-runner-app", + "project_id": "broad-dsde-alpha", + "cluster_name": "terra-alpha", + "location": "us-central1-a" + } + }, + "timestamp": "2022-01-10T08:01:16.389082718Z", + "severity": "INFO", + "labels": { + "k8s-pod/pod-template-hash": "5c4f9c49cd", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8vgj", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/part-of": "terra" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-10T08:01:20.572532753Z" + }, + { + "insertId": "xo9wueok4wnstx6q", + "jsonPayload": { + "localTimestamp": "2022-01-10 08:00:39,672", + "sourceThread": "cromwell-system-akka.dispatchers.io-dispatcher-211", + "message": "Beginning IoActor backpressure for 36.70 seconds" + }, + "resource": { + "type": "k8s_container", + "labels": { + "pod_name": "cromwell1-runner-5c4f9c49cd-tcfrr", + "project_id": "broad-dsde-alpha", + "namespace_name": "terra-alpha", + "cluster_name": "terra-alpha", + "location": "us-central1-a", + "container_name": "cromwell1-runner-app" + } + }, + "timestamp": "2022-01-10T08:00:39.672924004Z", + "severity": "INFO", + "labels": { + "k8s-pod/app_kubernetes_io/managed-by": "Helm", + "k8s-pod/deployment": "cromwell1-runner", + "k8s-pod/app_kubernetes_io/part-of": "terra", + "k8s-pod/app_kubernetes_io/instance": "cromwell", + "k8s-pod/pod-template-hash": "5c4f9c49cd", + "k8s-pod/helm_sh/chart": "cromwell-0.26.0", + "k8s-pod/app_kubernetes_io/component": "cromwell", + "k8s-pod/app_kubernetes_io/name": "cromwell", + "compute.googleapis.com/resource_name": "gke-terra-alpha-cromwell-v3-5142b1df-8vgj" + }, + "logName": "projects/broad-dsde-alpha/logs/stdout", + "receiveTimestamp": "2022-01-10T08:00:40.585240843Z" + } +] \ No newline at end of file diff --git a/scripts/backpressure_report/test/test_backpressure_event.py b/scripts/backpressure_report/test/test_backpressure_event.py new file mode 100644 index 00000000000..a05c1820eb2 --- /dev/null +++ b/scripts/backpressure_report/test/test_backpressure_event.py @@ -0,0 +1,13 @@ +import unittest + +from backpressure_report.lib import backpressure_event +from test.lib.log_helper import LOG + + +class BackpressureEventTestMethods(unittest.TestCase): + + def test_build_backpressure_events_from_log_jsons(self): + events = backpressure_event.build_backpressure_events_from_log_jsons([LOG]) + self.assertEqual(17, len(events)) + # Minimum backpressure duration is 20 seconds, maximum is 60 seconds. + self.assertTrue(all([(d.duration() >= 20) and (d.duration() <= 60) for d in events])) diff --git a/scripts/backpressure_report/test/test_backpressure_window.py b/scripts/backpressure_report/test/test_backpressure_window.py new file mode 100644 index 00000000000..0c46bf2b652 --- /dev/null +++ b/scripts/backpressure_report/test/test_backpressure_window.py @@ -0,0 +1,31 @@ +import unittest + +from backpressure_report.lib import backpressure_event, backpressure_window +from test.lib.log_helper import LOG + + +class BackpressureWindowTestMethods(unittest.TestCase): + + def setUp(self) -> None: + self.events = backpressure_event.build_backpressure_events_from_log_jsons([LOG]) + self.expected_durations = [36, 0, 32, 0, 189, 0, 0, 0, 46, 0, 0, 142, 60, 29] + + def __run_assert(self) -> None: + windows, all_pods = backpressure_window.build_windows_and_pods_from_events(self.events, window_width_in_hours=8) + actual_durations = [ + sum(w.durations_by_pod().values()) for w in windows + ] + self.assertEqual(self.expected_durations, actual_durations) + # One pod backpressuring + self.assertEqual("2022-01-11 00:00:00+00:00,32,0,0,0,0,0,32,0,0", windows[2].report_line(all_pods)) + # No pods backpressuring + self.assertEqual("2022-01-11 08:00:00+00:00,0,0,0,0,0,0,0,0,0", windows[3].report_line(all_pods)) + # Multiple pods backpressuring + self.assertEqual("2022-01-11 16:00:00+00:00,189,76,0,89,0,0,0,24,0", windows[4].report_line(all_pods)) + + def test_forward(self): + self.__run_assert() + + def test_reversed(self): + self.events.reverse() + self.__run_assert() diff --git a/scripts/backpressure_report/test/test_log_utils.py b/scripts/backpressure_report/test/test_log_utils.py new file mode 100644 index 00000000000..73cdf0c9efb --- /dev/null +++ b/scripts/backpressure_report/test/test_log_utils.py @@ -0,0 +1,24 @@ +import unittest +from backpressure_report.lib import log_utils +from test.lib.log_helper import LOG + + +class LogUtilsTestMethods(unittest.TestCase): + + def test_build_log_jsons_from_input_files(self): + self.assertEqual(34, len(LOG)) + + def test_is_start_event(self): + starts = [e for e in LOG if log_utils.is_start_event(e)] + self.assertEqual(17, len(starts)) + + def test_is_end_event(self): + ends = [e for e in LOG if log_utils.is_end_event(e)] + self.assertEqual(17, len(ends)) + + def test_filter_and_sort_log_entries(self): + filtered = log_utils.filter_and_sort_log_entries(LOG) + self.assertEqual(34, len(filtered)) + oldest = filtered[0]['timestamp'] + self.assertTrue(all([t['timestamp'] >= oldest for t in filtered])) + self.assertTrue(all(log_utils.is_start_event(e) or log_utils.is_end_event(e) for e in filtered)) diff --git a/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala b/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala index 034d1429ab9..da3021942ac 100644 --- a/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala @@ -129,7 +129,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor storeActor ! BatchSubmitWorkflows(NonEmptyList.of(helloWorldSourceFiles, helloWorldSourceFiles, helloCwlWorldSourceFiles)) val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList - storeActor ! FetchRunnableWorkflows(2) + storeActor ! FetchRunnableWorkflows(2, Set.empty) expectMsgPF(10 seconds) { case NewWorkflowsToStart(workflowNel) => workflowNel.toList.size shouldBe 2 @@ -142,7 +142,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor } } - storeActor ! FetchRunnableWorkflows(1) + storeActor ! FetchRunnableWorkflows(1, Set.empty) expectMsgPF(10 seconds) { case NewWorkflowsToStart(workflowNel) => workflowNel.toList.size shouldBe 1 @@ -182,7 +182,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor storeActor ! BatchSubmitWorkflows(NonEmptyList.of(optionedSourceFiles)) val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList - storeActor ! FetchRunnableWorkflows(1) + storeActor ! FetchRunnableWorkflows(1, Set.empty) expectMsgPF(10 seconds) { case NewWorkflowsToStart(workflowNel) => workflowNel.toList.size should be(1) @@ -232,7 +232,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor storeActor ! BatchSubmitWorkflows(NonEmptyList.of(helloWorldSourceFiles, helloWorldSourceFiles, helloWorldSourceFiles)) val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList - storeActor ! FetchRunnableWorkflows(100) + storeActor ! FetchRunnableWorkflows(100, Set.empty) expectMsgPF(10 seconds) { case NewWorkflowsToStart(workflowNel) => workflowNel.toList.size shouldBe 3 @@ -260,7 +260,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor "WorkflowStoreActor-RemainResponsiveForUnknown" ) - storeActor ! FetchRunnableWorkflows(100) + storeActor ! FetchRunnableWorkflows(100, Set.empty) expectMsgPF(10 seconds) { case NoNewWorkflowsToStart => // Great case x => fail(s"Unexpected response from supposedly empty WorkflowStore: $x") diff --git a/services/src/test/scala/cromwell/services/database/LobSpec.scala b/services/src/test/scala/cromwell/services/database/LobSpec.scala index a92cf84aed9..74a3cc619d5 100644 --- a/services/src/test/scala/cromwell/services/database/LobSpec.scala +++ b/services/src/test/scala/cromwell/services/database/LobSpec.scala @@ -175,7 +175,8 @@ class LobSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers with Sc heartbeatTimestampTo = OffsetDateTime.now.toSystemTimestamp, workflowStateFrom = "Submitted", workflowStateTo = "Running", - workflowStateExcluded = "On Hold" + workflowStateExcluded = "On Hold", + excludedGroups = Set.empty ) _ = { diff --git a/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala b/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala index 319ac966889..a0f899a26ff 100644 --- a/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala @@ -292,7 +292,7 @@ class PipelinesApiAsyncBackendJobExecutionActor(standardParams: StandardAsyncExe jesOutputs collectFirst { case jesOutput if jesOutput.name == makeSafeReferenceName(path) => val pathAsString = jesOutput.cloudPath.pathAsString - if (!jesOutput.cloudPath.exists) { + if (jesOutput.isFileParameter && !jesOutput.cloudPath.exists) { // This is not an error if the path represents a `File?` optional output (the PAPI delocalization script // should have failed if this file output was not optional but missing). Throw to produce the correct "empty // optional" value for a missing optional file output. diff --git a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala index 1188b427b3d..77370cabc92 100644 --- a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala @@ -292,7 +292,7 @@ class PipelinesApiAsyncBackendJobExecutionActor(standardParams: StandardAsyncExe jesOutputs collectFirst { case jesOutput if jesOutput.name == makeSafeReferenceName(path) => val pathAsString = jesOutput.cloudPath.pathAsString - if (!jesOutput.cloudPath.exists) { + if (jesOutput.isFileParameter && !jesOutput.cloudPath.exists) { // This is not an error if the path represents a `File?` optional output (the PAPI delocalization script // should have failed if this file output was not optional but missing). Throw to produce the correct "empty // optional" value for a missing optional file output. diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala index 1a744f5da16..103f8e51967 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala @@ -9,6 +9,7 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric.Positive import wom.RuntimeAttributesKeys import wom.format.MemorySize +import wom.types.WomStringType import wom.values._ case class TesRuntimeAttributes(continueOnReturnCode: ContinueOnReturnCode, @@ -19,7 +20,7 @@ case class TesRuntimeAttributes(continueOnReturnCode: ContinueOnReturnCode, memory: Option[MemorySize], disk: Option[MemorySize], preemptible: Boolean, - backendParameters: Map[String, String]) + backendParameters: Map[String, Option[String]]) object TesRuntimeAttributes { @@ -55,12 +56,17 @@ object TesRuntimeAttributes { def makeBackendParameters(runtimeAttributes: Map[String, WomValue], keysToExclude: Set[String], - config: TesConfiguration): Map[String, String] = { + config: TesConfiguration): Map[String, Option[String]] = { if (config.useBackendParameters) runtimeAttributes .filterKeys(k => !keysToExclude.contains(k)) - .collect { case (key, strValue: WomString) => (key, strValue.value)} + .flatMap( _ match { + case (key, WomString(s)) => Option((key, Option(s))) + case (key, WomOptionalValue(WomStringType, Some(WomString(optS)))) => Option((key, Option(optS))) + case (key, WomOptionalValue(WomStringType, None)) => Option((key, None)) + case _ => None + }) else Map.empty } diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala index 3ebb1b82ac5..b4e9655b3c7 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala @@ -238,7 +238,7 @@ object TesTask { .workflowOptions .get(TesWorkflowOptionKeys.WorkflowExecutionIdentity) .toOption - .map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> _) + .map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_)) .toMap val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map { @@ -298,7 +298,7 @@ final case class Resources(cpu_cores: Option[Int], disk_gb: Option[Double], preemptible: Option[Boolean], zones: Option[Seq[String]], - backend_parameters: Option[Map[String, String]]) + backend_parameters: Option[Map[String, Option[String]]]) final case class OutputFileLog(url: String, path: String, diff --git a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala index 210ac12ca15..6314ed30d3a 100644 --- a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala +++ b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala @@ -75,7 +75,7 @@ class TesRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeoutSpec val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "preemptible" -> WomString("yes")) assertFailure(runtimeAttributes, "Expecting preemptible runtime attribute to be a Boolean or a String with values of 'true' or 'false'") } - + "validate a valid continueOnReturnCode entry" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "continueOnReturnCode" -> WomInteger(1)) val expectedRuntimeAttributes = expectedDefaultsPlusUbuntuDocker.copy(continueOnReturnCode = ContinueOnReturnCodeSet(Set(1))) @@ -160,13 +160,26 @@ class TesRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeoutSpec "turn unknown string attributes into backend parameters" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "foo" -> WomString("bar")) - val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("foo" -> "bar")) + val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("foo" -> Option("bar"))) assertSuccess(runtimeAttributes, expectedRuntimeAttributes, tesConfig = mockTesConfigWithBackendParams) } "exclude unknown non-string attributes from backend parameters" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "foo" -> WomInteger(5), "bar" -> WomString("baz")) - val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("bar" -> "baz")) + val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("bar" -> Option("baz"))) + assertSuccess(runtimeAttributes, expectedRuntimeAttributes, tesConfig = mockTesConfigWithBackendParams) + } + + + "turn populated optional unknown string attributes into backend parameters" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "foo" -> WomOptionalValue(WomString("bar"))) + val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("foo" -> Option("bar"))) + assertSuccess(runtimeAttributes, expectedRuntimeAttributes, tesConfig = mockTesConfigWithBackendParams) + } + + "turn unpopulated optional unknown string attributes into backend parameters" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "foo" -> WomOptionalValue.none(WomStringType)) + val expectedRuntimeAttributes = expectedDefaults.copy(backendParameters = Map("foo" -> None)) assertSuccess(runtimeAttributes, expectedRuntimeAttributes, tesConfig = mockTesConfigWithBackendParams) } } diff --git a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesTaskSpec.scala b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesTaskSpec.scala index 347049714c9..b71a9ddefd5 100644 --- a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesTaskSpec.scala +++ b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesTaskSpec.scala @@ -30,21 +30,21 @@ class TesTaskSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers wit it should "create the correct resources when an identity is passed in WorkflowOptions" in { val wd = workflowDescriptorWithIdentity(Option("abc123")) TesTask.makeResources(runtimeAttributes, wd) shouldEqual - Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> "abc123")) + Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"))) ) } it should "create the correct resources when an empty identity is passed in WorkflowOptions" in { val wd = workflowDescriptorWithIdentity(Option("")) TesTask.makeResources(runtimeAttributes, wd) shouldEqual - Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> "")) + Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""))) ) } it should "create the correct resources when no identity is passed in WorkflowOptions" in { val wd = workflowDescriptorWithIdentity(None) TesTask.makeResources(runtimeAttributes, wd) shouldEqual - Resources(None, None, None, Option(false), None, Option(Map.empty[String, String]) + Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]]) ) } }