Skip to content

Include WorkflowMetadata in lineage records #6069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package nextflow.lineage

import nextflow.NextflowMeta
import nextflow.extension.FilesEx
import nextflow.lineage.exception.OutputRelativePathException
import nextflow.script.FusionMetadata
import nextflow.script.WaveMetadata

import static nextflow.lineage.fs.LinPath.*

Expand Down Expand Up @@ -71,6 +75,11 @@ import nextflow.util.TestOnly
@Slf4j
@CompileStatic
class LinObserver implements TraceObserverV2 {
private static List<String> workflowMetadataPropertiesToRemove = [
"sessionId", "name", //Already in workflowRun
"scriptFile", "scriptName", "scriptId", "repository", "commitId", "revision", "projectName", "manifest", //Already in workflow
"stats", "success" // End
]
Comment on lines +78 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be preferable to keep as much as possible aligned to WorkflowMeta structure, in the same way as it's done in the tower client

we could consider removing repeated values in the parent object

Copy link
Contributor Author

@jorgee jorgee May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I get your comment.

If I am not wrong, in TowerClient, the workflow is a plain map that contains everything that is in WorkflowMetadata, removing stats and adding other properties like the resolved config and params.

If we do the same and include everything as property of the WorkflowRun class, it will be very difficult to maintain, so I prefer to keep it inside the metadata property (or rename it to another name). If a new parameter is added in the WorkflowMetadata, it will also be added to the 'metadata' without any model update.

A small difference is the sessionId and name that are properties in WorkflowRun and they were also in the WorkflowMetadata map. I do not see a problem with keeping it in metadata and removing it from WorkflowRun. We can also add the resolved config if you think it is better.

The big difference is in the workflow and params. I think they are the parts that mainly describe the WorkflowRun, and, in the TowerClient, they are spread in a set of properties. In workflow, we already included the information that is in scriptFile, scriptName, scriptId, repository, commitId. I have just added other info (revision, projectName and manifest) that I think they are describing the workflow more than the execution. In fact, I think the workflow description could have a separate record, and LID and just have a reference in the WorkflowRun.

private static Map<Class<? extends BaseParam>, String> taskParamToValue = [
(StdOutParam) : "stdout",
(StdInParam) : "stdin",
Expand Down Expand Up @@ -155,15 +164,19 @@ class LinObserver implements TraceObserverV2 {
final workflow = new Workflow(
collectScriptDataPaths(normalizer),
session.workflowMetadata.repository,
session.workflowMetadata.commitId
session.workflowMetadata.commitId,
session.workflowMetadata.revision,
session.workflowMetadata.projectName,
session.workflowMetadata.manifest?.toMap()
)
// create the workflow run main object
final value = new WorkflowRun(
workflow,
session.uniqueId.toString(),
session.runName,
getNormalizedParams(session.params, normalizer),
SecretHelper.hideSecrets(session.config.deepClone()) as Map
SecretHelper.hideSecrets(session.config.deepClone()) as Map,
addOtherMetadata()
)
final executionHash = CacheHelper.hasher(value).hash().toString()
store.save(executionHash, value)
Expand Down Expand Up @@ -473,4 +486,24 @@ class LinObserver implements TraceObserverV2 {
}
return paths
}

private Map addOtherMetadata() {
try {
def metadata = session.workflowMetadata.toMap()
.collectEntries { it.value instanceof Path ? [it.key, FilesEx.toUriString(it.value as Path) ] : [it.key, it.value] }
metadata.removeAll {it.key.toString() in workflowMetadataPropertiesToRemove }
if( metadata.containsKey("nextflow") )
metadata["nextflow"] = (metadata["nextflow"] as NextflowMeta).toJsonMap()
if( metadata.containsKey("configFiles") )
metadata["configFiles"] = (metadata["configFiles"] as List<Path>).collect {FilesEx.toUriString(it)}
if( metadata.containsKey( "wave") )
metadata["wave"] = (metadata["wave"] as WaveMetadata).enabled
if( metadata.containsKey( "fusion") )
metadata["fusion"] = (metadata["fusion"] as FusionMetadata).enabled
return metadata
}catch( Throwable e){
log.debug("Error creating metadata", e)
return [:]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,16 @@ class Workflow implements LinSerializable {
* Workflow commit identifier
*/
String commitId
/**
* Workflow revision
*/
String revision
/**
* Project name
*/
String projectName
/**
* Workflow Manifest
*/
Map manifest
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ class WorkflowRun implements LinSerializable {
* Resolved Configuration
*/
Map config
/**
* Raw metadata
*/
Map metadata

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package nextflow.lineage

import nextflow.extension.FilesEx
import nextflow.lineage.exception.OutputRelativePathException

import java.nio.file.Files
Expand Down Expand Up @@ -165,14 +166,31 @@ class LinObserverTest extends Specification {
def store = new DefaultLinStore();
def uniqueId = UUID.randomUUID()
def scriptFile = folder.resolve("main.nf")
def map = [
repository: "https://nextflow.io/nf-test/",
commitId: "123456",
scriptId: "78910",
scriptFile: scriptFile,
projectDir: folder.resolve("projectDir"),
revision: "main",
projectName: "nextflow.io/nf-test",
workDir: folder.resolve("workDir")
]
def metadata = Mock(WorkflowMetadata){
getRepository() >> "https://nextflow.io/nf-test/"
getCommitId() >> "123456"
getScriptId() >> "78910"
getScriptFile() >> scriptFile
getProjectDir() >> folder.resolve("projectDir")
getWorkDir() >> folder.resolve("workDir")
getRepository() >> map.repository
getCommitId() >> map.commitId
getScriptId() >> map.scriptId
getScriptFile() >> map.scriptFile
getProjectDir() >> map.projectDir
getRevision() >> map.revision
getProjectName() >> map.projectName
getWorkDir() >> map.workDir
toMap() >> map
}
def expectedMap = [
projectDir: FilesEx.toUriString(map.projectDir),
workDir: FilesEx.toUriString(map.workDir)
]
def session = Mock(Session) {
getConfig() >> config
getUniqueId() >> uniqueId
Expand All @@ -183,8 +201,8 @@ class LinObserverTest extends Specification {
store.open(LineageConfig.create(session))
def observer = new LinObserver(session, store)
def mainScript = new DataPath("file://${scriptFile.toString()}", new Checksum("78910", "nextflow", "standard"))
def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" )
def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config)
def workflow = new Workflow([mainScript], map.repository, map.commitId, map.revision, map.projectName )
def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config, expectedMap)
when:
observer.onFlowCreate(session)
observer.onFlowBegin()
Expand Down