Skip to content

Commit 7b30964

Browse files
authored
Workflow output attach debug logging [BW-529] (#1376)
1 parent d737b49 commit 7b30964

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import akka.pattern._
77
import com.google.api.client.auth.oauth2.Credential
88
import com.typesafe.scalalogging.LazyLogging
99
import nl.grons.metrics4.scala.Counter
10-
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsFatalExceptionWithErrorReport}
10+
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsFatalExceptionWithErrorReport, model}
1111
import org.broadinstitute.dsde.rawls.coordination.DataSourceAccess
1212
import org.broadinstitute.dsde.rawls.dataaccess._
1313
import org.broadinstitute.dsde.rawls.dataaccess.slick._
@@ -445,27 +445,36 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
445445
logger.debug(s"attaching outputs for ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}: ${outputs}")
446446
logger.debug(s"output expressions for ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}: ${outputExpressionMap}")
447447

448-
val parsedExpressions = outputExpressionMap.map { case (outputName, outputExprStr) =>
448+
val parsedExpressions: Seq[Try[OutputExpression]] = outputExpressionMap.map { case (outputName, outputExprStr) =>
449449
outputs.get(outputName) match {
450450
case None => Failure(new RawlsException(s"output named ${outputName} does not exist"))
451451
case Some(Right(uot: UnsupportedOutputType)) => Failure(new RawlsException(s"output named ${outputName} is not a supported type, received json u${uot.json.compactPrint}"))
452452
case Some(Left(output)) =>
453453
val entityTypeOpt = workflowRecord.workflowEntityId.flatMap(entitiesById.get).map(_.entityType)
454454
OutputExpression.build(outputExprStr, output, entityTypeOpt)
455455
}
456-
}
456+
}.toSeq
457457

458458
if (parsedExpressions.forall(_.isSuccess)) {
459-
val boundExpressions = parsedExpressions.collect { case Success(boe @ BoundOutputExpression(target, name, attr)) => boe }
459+
val boundExpressions: Seq[BoundOutputExpression] = parsedExpressions.collect { case Success(boe @ BoundOutputExpression(target, name, attr)) => boe }
460460
val updates = updateEntityAndWorkspace(workflowRecord.workflowEntityId.map(id => Some(entitiesById(id))).getOrElse(None), workspace, boundExpressions)
461+
462+
val (optEntityUpdates, optWs) = updates
463+
optEntityUpdates foreach { update: WorkflowEntityUpdate =>
464+
logger.debug(s"Updating ${update.upserts.size} attributes for entity ${update.entityRef.entityName} of type ${update.entityRef.entityType} in ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}. First 100: ${update.upserts.take(100)}")
465+
}
466+
optWs foreach { workspace: Workspace =>
467+
logger.debug(s"Updating ${workspace.attributes.size} workspace attributes in ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}. First 100: ${workspace.attributes.take(100)}")
468+
}
469+
461470
Left(updates)
462471
} else {
463-
Right((workflowRecord, parsedExpressions.collect { case Failure(t) => AttributeString(t.getMessage) }.toSeq))
472+
Right((workflowRecord, parsedExpressions.collect { case Failure(t) => AttributeString(t.getMessage) }))
464473
}
465474
}
466475
}
467476

468-
def updateEntityAndWorkspace(entity: Option[Entity], workspace: Workspace, workflowOutputs: Iterable[BoundOutputExpression]): (Option[WorkflowEntityUpdate], Option[Workspace]) = {
477+
private def updateEntityAndWorkspace(entity: Option[Entity], workspace: Workspace, workflowOutputs: Iterable[BoundOutputExpression]): (Option[WorkflowEntityUpdate], Option[Workspace]) = {
469478
val entityUpsert = workflowOutputs.collect({ case BoundOutputExpression(ThisEntityTarget, attrName, attr) => (attrName, attr) })
470479
val workspaceAttributes = workflowOutputs.collect({ case BoundOutputExpression(WorkspaceTarget, attrName, attr) => (attrName, attr) })
471480

core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/WorkflowSubmissionActor.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,10 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
169169
} else {
170170
for {
171171
// we exclude workflows submitted by users that have exceeded the max workflows per user
172-
excludedUsers <- dataAccess.workflowQuery.listSubmittersWithMoreWorkflowsThan(maxActiveWorkflowsPerUser, WorkflowStatuses.runningStatuses)
173-
workflowRecs <- dataAccess.workflowQuery.findQueuedWorkflows(excludedUsers.map { case (submitter, count) => submitter }, SubmissionStatuses.terminalStatuses :+ SubmissionStatuses.Aborting).take(batchSize).result
172+
excludedUsersMap <- dataAccess.workflowQuery.listSubmittersWithMoreWorkflowsThan(maxActiveWorkflowsPerUser, WorkflowStatuses.runningStatuses).map(_.toMap)
173+
excludedUsers = excludedUsersMap.keys.toSeq
174+
excludedStatuses = SubmissionStatuses.terminalStatuses :+ SubmissionStatuses.Aborting
175+
workflowRecs: Seq[WorkflowRecord] <- dataAccess.workflowQuery.findQueuedWorkflows(excludedUsers, excludedStatuses).take(batchSize).result
174176
reservedRecs <- if (workflowRecs.isEmpty) {
175177
DBIO.successful(None)
176178
} else {
@@ -312,6 +314,8 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
312314

313315
val WorkflowBatch(workflowIds, submissionRec, workspaceRec) = batch
314316

317+
logger.info(s"Submitting batch of ${workflowIds.size} workflows for submission ${submissionRec.id} in workspace ${workspaceRec.namespace}/${workspaceRec.name} (${workspaceRec.id})")
318+
315319
// implicitly passed to WorkflowComponent methods which update status
316320
implicit val wfStatusCounter = (status: WorkflowStatus) =>
317321
if (trackDetailedSubmissionMetrics) Option(workflowStatusCounter(workspaceSubmissionMetricBuilder(workspaceRec.toWorkspaceName, submissionRec.id))(status))

0 commit comments

Comments
 (0)