Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Feb 14, 2022
2 parents 33afec0 + 2e46ca2 commit d503879
Show file tree
Hide file tree
Showing 43 changed files with 1,792 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import akka.http.scaladsl.server._
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import com.typesafe.config.Config
import cromiam.server.config.SwaggerOauthConfig
import net.ceedubs.ficus.Ficus._

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -127,24 +125,6 @@ trait SwaggerUiHttpService extends Directives {
protected def rewriteSwaggerIndex(data: String): String = data
}

/**
* Extends the SwaggerUiHttpService to gets UI configuration values from a provided Typesafe Config.
*/
trait SwaggerUiConfigHttpService extends SwaggerUiHttpService {
/**
* @return The swagger UI config.
*/
def swaggerUiConfig: Config

override def swaggerUiVersion = swaggerUiConfig.getString("uiVersion")

abstract override def swaggerUiBaseUrl = swaggerUiConfig.as[Option[String]]("baseUrl").getOrElse(super.swaggerUiBaseUrl)

abstract override def swaggerUiPath = swaggerUiConfig.as[Option[String]]("uiPath").getOrElse(super.swaggerUiPath)

abstract override def swaggerUiDocsPath = swaggerUiConfig.as[Option[String]]("docsPath").getOrElse(super.swaggerUiDocsPath)
}

/**
* An extension of HttpService to serve up a resource containing the swagger api as yaml or json. The resource
* directory and path on the classpath must match the path for route. The resource can be any file type supported by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.model.{ContentTypes, StatusCodes, Uri}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
import common.assertion.CromwellTimeoutSpec
import cromiam.server.config.SwaggerOauthConfig
import cromiam.webservice.SwaggerUiHttpServiceSpec._
Expand Down Expand Up @@ -114,56 +113,6 @@ class NoRedirectRootSwaggerUiHttpServiceSpec extends SwaggerUiHttpServiceSpec {
}
}

class DefaultSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService {
override def swaggerUiConfig = ConfigFactory.parseString(s"uiVersion = $TestSwaggerUiVersion")

behavior of "SwaggerUiConfigHttpService"

it should "redirect /swagger to the index.html" in {
Get("/swagger") ~> swaggerUiRoute ~> check {
status should be(StatusCodes.TemporaryRedirect)
header("Location") should be(Option(Location(Uri("/swagger/index.html?url=/api-docs"))))
contentType should be(ContentTypes.`text/html(UTF-8)`)
}
}

it should "return index.html from the swagger-ui jar" in {
Get("/swagger/index.html") ~> swaggerUiRoute ~> check {
status should be(StatusCodes.OK)
responseAs[String].take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble)
contentType should be(ContentTypes.`text/html(UTF-8)`)
}
}
}

class OverriddenSwaggerUiConfigHttpServiceSpec extends SwaggerUiHttpServiceSpec with SwaggerUiConfigHttpService {
override def swaggerUiConfig = ConfigFactory.parseString(
s"""
|baseUrl = /base
|docsPath = swagger/cromiam.yaml
|uiPath = ui/path
|uiVersion = $TestSwaggerUiVersion
""".stripMargin)

behavior of "SwaggerUiConfigHttpService"

it should "redirect /ui/path to the index.html under /base" in {
Get("/ui/path") ~> swaggerUiRoute ~> check {
status should be(StatusCodes.TemporaryRedirect)
header("Location") should be(Option(Location(Uri("/base/ui/path/index.html?url=/base/swagger/cromiam.yaml"))))
contentType should be(ContentTypes.`text/html(UTF-8)`)
}
}

it should "return index.html from the swagger-ui jar" in {
Get("/ui/path/index.html") ~> swaggerUiRoute ~> check {
status should be(StatusCodes.OK)
responseAs[String].take(SwaggerIndexPreamble.length) should be(SwaggerIndexPreamble)
contentType should be(ContentTypes.`text/html(UTF-8)`)
}
}
}

class YamlSwaggerResourceHttpServiceSpec extends SwaggerResourceHttpServiceSpec {
override def swaggerServiceName = "testservice"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: directory_type_output_papi
testFormat: workflowsuccess
tags: ["wdl_biscayne"]
backends: [Papi]

files {
workflow: wdl_biscayne/directory_type_output/directory_type_output.wdl
}

metadata {
workflowName: main
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
version development

# CROM-6875 repro WDL to exercise Directory outputs. Since the Directory type does not exist in WDL versions 1.0 or
# draft-2, the bug this is checking for cannot and does not exist in those WDL versions.
workflow main {
call main { input: s1 = "x", s2 = "y" }
scatter (f in main.f) {
call checker { input: f = f }
}
output { Array[File] f = main.f }
}

task main {
input {
String s1
String s2
}

command <<<
set -euo pipefail
mkdir d
touch "d/~{s1}"
touch "d/~{s2}"
echo -e "d/~{s1}\nd/~{s2}"
>>>

output {
Directory d = "d"
Array[File] f = read_lines(stdout())
}

runtime {
docker: "debian:stable-slim"
}
}

task checker {
# Check files were actually created as expected above
input {
File f
}

command <<<
set -euo pipefail
[ -f ~{f} ]
>>>

output {
}

runtime {
docker: "debian:stable-slim"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {
heartbeatTimestampTo: Timestamp,
workflowStateFrom: String,
workflowStateTo: String,
workflowStateExcluded: String)
workflowStateExcluded: String,
excludedGroups: Set[String])
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] = {

def updateForFetched(cromwellId: String,
Expand Down Expand Up @@ -89,7 +90,7 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {

val action = for {
workflowStoreEntries <- dataAccess.fetchStartableWorkflows(
(limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded)
limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded, excludedGroups
).result
_ <- DBIO.sequence(
workflowStoreEntries map updateForFetched(cromwellId, heartbeatTimestampTo, workflowStateFrom, workflowStateTo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ trait WorkflowStoreEntryComponent {
/**
* Returns up to "limit" startable workflows, sorted by submission time.
*/
val fetchStartableWorkflows = Compiled(
(limit: ConstColumn[Long],
heartbeatTimestampTimedOut: ConstColumn[Timestamp],
excludeWorkflowState: Rep[String]
) => {
def fetchStartableWorkflows(limit: Long,
heartbeatTimestampTimedOut: Timestamp,
excludeWorkflowState: String,
excludedGroups: Set[String]
): Query[WorkflowStoreEntries, WorkflowStoreEntry, Seq] = {
val query = for {
row <- workflowStoreEntries
/*
Expand All @@ -93,11 +93,11 @@ trait WorkflowStoreEntryComponent {
transaction that we know will impact those readers.
*/
if (row.heartbeatTimestamp.isEmpty || row.heartbeatTimestamp < heartbeatTimestampTimedOut) &&
(row.workflowState =!= excludeWorkflowState)
(row.workflowState =!= excludeWorkflowState) &&
!(row.hogGroup inSet excludedGroups)
} yield row
query.forUpdate.sortBy(_.submissionTime.asc).take(limit)
}
)

/**
* Useful for counting workflows in a given state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______
heartbeatTimestampTo: Timestamp,
workflowStateFrom: String,
workflowStateTo: String,
workflowStateExcluded: String)
workflowStateExcluded: String,
excludedGroups: Set[String])
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]]

def writeWorkflowHeartbeats(workflowExecutionUuids: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
Determine the number of available workflow slots and request the smaller of that number and maxWorkflowsToLaunch.
*/
val maxNewWorkflows = maxWorkflowsToLaunch min (maxWorkflowsRunning - stateData.workflows.size - stateData.subWorkflows.size)
params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows)
params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows, excludedGroups = Set.empty)
stay()
case Event(WorkflowStoreEngineActor.NoNewWorkflowsToStart, _) =>
log.debug("WorkflowStore provided no new workflows to start")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.engine.workflow.workflowstore

import java.time.OffsetDateTime

import cats.data.NonEmptyList
import cromwell.core.{HogGroup, WorkflowId, WorkflowSourceFilesCollection}
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreAbortResponse.WorkflowStoreAbortResponse
Expand Down Expand Up @@ -32,7 +31,10 @@ class InMemoryWorkflowStore extends WorkflowStore {
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
if (excludedGroups.nonEmpty)
throw new UnsupportedOperationException("Programmer Error: group filtering not supported for single-tenant/in-memory workflow store")

val startableWorkflows = workflowStore filter { _._2 == WorkflowStoreState.Submitted } take n
val updatedWorkflows = startableWorkflows map { _._1 -> WorkflowStoreState.Running }
workflowStore = workflowStore ++ updatedWorkflows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
import cats.syntax.traverse._
import common.validation.Validation._
sqlDatabase.fetchWorkflowsInState(
Expand All @@ -106,7 +106,8 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa
OffsetDateTime.now.toSystemTimestamp,
WorkflowStoreState.Submitted.toString,
WorkflowStoreState.Running.toString,
WorkflowStoreState.OnHold.toString
WorkflowStoreState.OnHold.toString,
excludedGroups: Set[String]
) map {
// .get on purpose here to fail the future if something went wrong
_.toList.traverse(fromWorkflowStoreEntry).toTry.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait WorkflowStore {
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]]
def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]]

def writeWorkflowHeartbeats(workflowIds: Set[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ sealed trait WorkflowStoreAccess {
heartbeatDateTime: OffsetDateTime)
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int]

def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]]

def abort(workflowId: WorkflowId)
Expand All @@ -50,9 +50,9 @@ case class UncoordinatedWorkflowStoreAccess(store: WorkflowStore) extends Workfl
store.writeWorkflowHeartbeats(workflowIds.toVector.toSet, heartbeatDateTime)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups)
}

override def deleteFromStore(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
Expand All @@ -79,9 +79,9 @@ case class CoordinatedWorkflowStoreAccess(coordinatedWorkflowStoreAccessActor: A
)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups)
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(message).mapTo[List[WorkflowToStart]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ final case class WorkflowStoreActor private(

object WorkflowStoreActor {
sealed trait WorkflowStoreActorEngineCommand
final case class FetchRunnableWorkflows(n: Int) extends WorkflowStoreActorEngineCommand
final case class FetchRunnableWorkflows(n: Int, excludedGroups: Set[String]) extends WorkflowStoreActorEngineCommand
final case class AbortWorkflowCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand
final case class WorkflowOnHoldToSubmittedCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand
case object InitializerCommand extends WorkflowStoreActorEngineCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends
override def receive: Receive = {
case WriteHeartbeats(ids, heartbeatDateTime) =>
workflowStore.writeWorkflowHeartbeats(ids.toVector.toSet, heartbeatDateTime) |> run
case FetchStartableWorkflows(count, cromwellId, heartbeatTtl) =>
workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl) |> run
case FetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) =>
workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) |> run
case DeleteFromStore(workflowId) =>
workflowStore.deleteFromStore(workflowId) |> run
case Abort(workflowId) =>
Expand All @@ -45,7 +45,7 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends
object WorkflowStoreCoordinatedAccessActor {
final case class WriteHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
final case class DeleteFromStore(workflowId: WorkflowId)
final case class Abort(workflowId: WorkflowId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore,

private def startNewWork(command: WorkflowStoreActorEngineCommand, sndr: ActorRef, nextData: WorkflowStoreActorData) = {
val work: Future[Any] = command match {
case FetchRunnableWorkflows(count) =>
newWorkflowMessage(count) map { response =>
case FetchRunnableWorkflows(count, excludedGroups) =>
newWorkflowMessage(count, excludedGroups) map { response =>
response match {
case NewWorkflowsToStart(workflows) =>
val workflowsIds = workflows.map(_.id).toList
Expand Down Expand Up @@ -185,10 +185,10 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore,
/**
* Fetches at most n workflows, and builds the correct response message based on if there were any workflows or not
*/
private def newWorkflowMessage(maxWorkflows: Int): Future[WorkflowStoreEngineActorResponse] = {
private def newWorkflowMessage(maxWorkflows: Int, excludedGroups: Set[String]): Future[WorkflowStoreEngineActorResponse] = {
def fetchStartableWorkflowsIfNeeded = {
if (maxWorkflows > 0) {
workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl)
workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl, excludedGroups)
} else {
Future.successful(List.empty[WorkflowToStart])
}
Expand Down
Loading

0 comments on commit d503879

Please sign in to comment.