Skip to content

Remove URL fragment in lineage IDs #6011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ The `channel.fromLineage` factory creates a channel that emits files from the {r

```nextflow
channel
.fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta'])
.fromLineage(workflowLaunch: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta'])
.view()
```

Expand All @@ -86,7 +86,7 @@ Available options:
`taskRun`
: LID of the task run that produced the desired files.

`workflowRun`
`workflowLaunch`
: LID of the workflow run that produced the desired files.

(channel-fromlist)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface LinExtension {
static final Map PARAMS = [
label: [List,String,GString],
taskRun: [String,GString],
workflowRun: [String,GString],
workflowLaunch: [String,GString],
]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class CmdLineageTest extends Specification {
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
lidLog.write("run_name", uniqueId, "lid://123456", date)
def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456".toString()
lidLog.write("run_name", uniqueId, "lid://123456","lid://567890", date)
def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456\tlid://567890".toString()
when:
def lidCmd = new CmdLineage(launcher: launcher, args: ["list"])
lidCmd.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,34 @@ class DefaultLinHistoryLog implements LinHistoryLog {
Files.createDirectories(path)
}

void write(String name, UUID key, String runLid, Date date = null) {
@Override
void write(String name, UUID id, String launchLid, Date date = null) {
assert name
assert key
def timestamp = date ?: new Date()
final recordFile = path.resolve(runLid.substring(LID_PROT.size()))
assert id
final timestamp = date ?: new Date()
final recordFile = path.resolve(launchId.substring(LID_PROT.size()))
try {
recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString()
log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}")
recordFile.text = new LinHistoryRecord(timestamp, name, id, null, launchLid, null).toString()
log.trace("Record for $launchLid written in lineage history log ${FilesEx.toUriString(this.path)}")
}catch (Throwable e) {
log.warn("Can't write record $key file ${FilesEx.toUriString(recordFile)}", e.message)
log.warn("Can't write record $launchLid file ${FilesEx.toUriString(recordFile)}", e.message)
}
}

@Override
void finalize(String launchLid, String runLid, String status) {
assert id
final recordFile = path.resolve(launchId.substring(LID_PROT.size()))
try {
final current = LinHistoryRecord.parse(recordFile.text)
recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, status, current.launchLid, runLid).toString()
}
catch (Throwable e) {
log.warn("Can't read record $launchId file: ${FilesEx.toUriString(recordFile)}", e.message)
}
}

@Override
List<LinHistoryRecord> getRecords(){
List<LinHistoryRecord> list = new LinkedList<LinHistoryRecord>()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class LinExtensionImpl implements LinExtension {

private static Map<String, List<String>> buildQueryParams(Map<String,?> opts) {
final queryParams = [type: [FileOutput.class.simpleName] ]
if( opts.workflowRun )
queryParams['workflowRun'] = [opts.workflowRun as String]
if( opts.workflowLaunch )
queryParams['workflowLaunch'] = [opts.workflowLaunch as String]
if( opts.taskRun )
queryParams['taskRun'] = [opts.taskRun as String]
if( opts.label ) {
Expand Down
13 changes: 11 additions & 2 deletions modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,18 @@ interface LinHistoryLog {
*
* @param name Workflow execution name.
* @param sessionId Workflow session ID.
* @param runLid Workflow run ID.
* @param launchLid Workflow launch Lineage ID.
*/
void write(String name, UUID sessionId, String runLid)
void write(String name, UUID sessionId, String launchLid)

/**
* Finalize the log record for a given run.
*
* @param launchLid Workflow launch Lineage ID.
* @param runLid Workflow run Lineage ID.
* @param status Workflow run completion status.
*/
void finalize(String launchLid, String runLid, String status)

/**
* Get the store records in the Lineage History Log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ class LinHistoryRecord {
final Date timestamp
final String runName
final UUID sessionId
final String status
final String launchLid
final String runLid

LinHistoryRecord(Date timestamp, String name, UUID sessionId, String runLid) {
LinHistoryRecord(Date timestamp, String name, UUID sessionId, String status, String launchLid, String runLid) {
this.timestamp = timestamp
this.runName = name
this.sessionId = sessionId
this.status = status
this.launchLid = launchLid
this.runLid = runLid
}

Expand All @@ -51,6 +55,8 @@ class LinHistoryRecord {
timestamp ? TIMESTAMP_FMT.format(timestamp) : '-',
runName ?: '-',
sessionId.toString(),
status ?: '-',
launchLid ?: '-',
runLid ?: '-',
)
}
Expand All @@ -62,8 +68,8 @@ class LinHistoryRecord {

static LinHistoryRecord parse(String line) {
final cols = line.tokenize('\t')
if (cols.size() == 4) {
return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3])
if (cols.size() == 6) {
return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3], cols[4], cols[5])
}
throw new IllegalArgumentException("Not a valid history entry: `$line`")
}
Expand Down
118 changes: 45 additions & 73 deletions modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import nextflow.lineage.model.Checksum
import nextflow.lineage.model.FileOutput
import nextflow.lineage.model.DataPath
import nextflow.lineage.model.Parameter
import nextflow.lineage.model.TaskOutput
import nextflow.lineage.model.Workflow
import nextflow.lineage.model.WorkflowOutput
import nextflow.lineage.model.WorkflowLaunch
import nextflow.lineage.model.WorkflowRun
import nextflow.file.FileHelper
import nextflow.file.FileHolder
Expand Down Expand Up @@ -84,10 +83,10 @@ class LinObserver implements TraceObserverV2 {
(EachInParam) : "each"
]

private String executionHash
private String launchId
private LinStore store
private Session session
private WorkflowOutput workflowOutput
private List<Parameter> outputs = new LinkedList<Parameter>()
private Map<String,String> outputsStoreDirLid = new HashMap<String,String>(10)
private PathNormalizer normalizer

Expand All @@ -97,34 +96,32 @@ class LinObserver implements TraceObserverV2 {
}

@TestOnly
String getExecutionHash(){ executionHash }

@TestOnly
String setExecutionHash(String hash){ this.executionHash = hash }
String setLaunchId(String hash){ this.launchId = hash }

@TestOnly
String setNormalizer(PathNormalizer normalizer){ this.normalizer = normalizer }

@Override
void onFlowBegin() {
normalizer = new PathNormalizer(session.workflowMetadata)
executionHash = storeWorkflowRun(normalizer)
final executionUri = asUriString(executionHash)
workflowOutput = new WorkflowOutput(
OffsetDateTime.now(),
executionUri,
new LinkedList<Parameter>()
)
this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri)
launchId = storeWorkflowLaunch(normalizer)
this.store.getHistoryLog().write(session.runName, session.uniqueId, asUriString(launchId))
}

@Override
void onFlowComplete(){
if(workflowOutput?.output ){
workflowOutput.createdAt = OffsetDateTime.now()
final key = executionHash + '#output'
this.store.save(key, workflowOutput)
}
void onFlowComplete() {
final status = session.isCancelled()
? "CANCELLED"
: session.isSuccess() ? "SUCCEEDED" : "FAILED"
final workflowRun = new WorkflowRun(
OffsetDateTime.now(),
asUriString(launchId),
status,
outputs
)
final runId = CacheHelper.hasher(workflowRun).hash().toString()
this.store.save(runId, workflowRun)
this.store.getHistoryLog().finalize(session.uniqueId, asUriString(runId), status)
}

protected Collection<Path> allScriptFiles() {
Expand All @@ -150,24 +147,24 @@ class LinObserver implements TraceObserverV2 {
return result.sort{it.path}
}

protected String storeWorkflowRun(PathNormalizer normalizer) {
protected String storeWorkflowLaunch(PathNormalizer normalizer) {
// create the workflow object holding script files and repo tracking info
final workflow = new Workflow(
collectScriptDataPaths(normalizer),
session.workflowMetadata.repository,
session.workflowMetadata.commitId
)
// create the workflow run main object
final value = new WorkflowRun(
final value = new WorkflowLaunch(
workflow,
session.uniqueId.toString(),
session.runName,
getNormalizedParams(session.params, normalizer),
SecretHelper.hideSecrets(session.config.deepClone()) as Map
)
final executionHash = CacheHelper.hasher(value).hash().toString()
store.save(executionHash, value)
return executionHash
final launchId = CacheHelper.hasher(value).hash().toString()
store.save(launchId, value)
return launchId
}

protected static List<Parameter> getNormalizedParams(Map<String, Object> params, PathNormalizer normalizer){
Expand All @@ -180,43 +177,6 @@ class LinObserver implements TraceObserverV2 {
return normalizedParams
}

@Override
void onTaskComplete(TaskEvent event) {
storeTaskInfo(event.handler.task)
}

protected void storeTaskInfo(TaskRun task) {
// store the task run entry
storeTaskRun(task, normalizer)
// store all task results
storeTaskResults(task, normalizer)
}

protected String storeTaskResults(TaskRun task, PathNormalizer normalizer){
final outputParams = getNormalizedTaskOutputs(task, normalizer)
final value = new TaskOutput( asUriString(task.hash.toString()), asUriString(executionHash), OffsetDateTime.now(), outputParams )
final key = task.hash.toString() + '#output'
store.save(key,value)
return key
}

private List<Parameter> getNormalizedTaskOutputs(TaskRun task, PathNormalizer normalizer){
final outputs = task.getOutputs()
final outputParams = new LinkedList<Parameter>()
for( Map.Entry<OutParam,Object> entry : outputs ) {
manageTaskOutputParameter(entry.key, outputParams, entry.value, task, normalizer)
}
return outputParams
}

private void manageTaskOutputParameter(OutParam key, LinkedList<Parameter> outputParams, value, TaskRun task, PathNormalizer normalizer) {
if (key instanceof FileOutParam) {
outputParams.add(new Parameter(getParameterType(key), key.name, manageFileOutParam(value, task)))
} else {
outputParams.add(new Parameter(getParameterType(key), key.name, normalizeValue(value, normalizer)))
}
}

private static Object normalizeValue(Object value, PathNormalizer normalizer) {
if (value instanceof Path)
return normalizer.normalizePath((Path)value)
Expand All @@ -226,6 +186,11 @@ class LinObserver implements TraceObserverV2 {
return value
}

@Override
void onTaskComplete(TaskEvent event) {
storeTaskRun(event.handler.task, normalizer)
}

private Object manageFileOutParam(Object value, TaskRun task) {
if (value == null) {
log.debug "Unexpected lineage File output value null"
Expand All @@ -247,7 +212,7 @@ class LinObserver implements TraceObserverV2 {

protected String storeTaskRun(TaskRun task, PathNormalizer normalizer) {
final codeChecksum = Checksum.ofNextflow(session.stubRun ? task.stubSource : task.source)
final value = new nextflow.lineage.model.TaskRun(
final taskRun = new nextflow.lineage.model.TaskRun(
session.uniqueId.toString(),
task.getName(),
codeChecksum,
Expand All @@ -262,12 +227,19 @@ class LinObserver implements TraceObserverV2 {
normalizer.normalizePath(p.normalize()),
Checksum.ofNextflow(p) )
},
asUriString(executionHash)
asUriString(launchId)
)

// store in the underlying persistence
final key = task.hash.toString()
store.save(key, value)
store.save(key, taskRun)

// store file outputs
task.outputs.forEach { OutParam param, Object value ->
if (param instanceof FileOutParam)
manageFileOutParam(value, task)
}

return key
}

Expand All @@ -280,7 +252,7 @@ class LinObserver implements TraceObserverV2 {
path.toUriString(),
checksum,
asUriString(task.hash.toString()),
asUriString(executionHash),
asUriString(launchId),
asUriString(task.hash.toString()),
attrs.size(),
LinUtils.toDate(attrs?.creationTime()),
Expand All @@ -300,7 +272,7 @@ class LinObserver implements TraceObserverV2 {

protected String getWorkflowOutputKey(Path target) {
final rel = getWorkflowRelative(target)
return executionHash + SEPARATOR + rel
return launchId + SEPARATOR + rel
}

protected String getTaskRelative(TaskRun task, Path path){
Expand Down Expand Up @@ -345,13 +317,13 @@ class LinObserver implements TraceObserverV2 {
final key = getWorkflowOutputKey(event.target)
final sourceReference = event.source
? getSourceReference(event.source)
: asUriString(executionHash)
: asUriString(launchId)
final attrs = readAttributes(event.target)
final value = new FileOutput(
event.target.toUriString(),
checksum,
sourceReference,
asUriString(executionHash),
asUriString(launchId),
null,
attrs.size(),
LinUtils.toDate(attrs?.creationTime()),
Expand All @@ -363,7 +335,7 @@ class LinObserver implements TraceObserverV2 {
log.warn1("Lineage for workflow output is not supported by publishDir directive")
}
catch (Throwable e) {
log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${executionHash}'", e)
log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${launchId}'", e)
}
}

Expand All @@ -381,7 +353,7 @@ class LinObserver implements TraceObserverV2 {
void onWorkflowOutput(WorkflowOutputEvent event) {
final type = getParameterType(event.value)
final value = convertPathsToLidReferences(event.index ?: event.value)
workflowOutput.output.add(new Parameter(type, event.name, value))
outputs.add(new Parameter(type, event.name, value))
}

protected static String getParameterType(Object param) {
Expand Down
Loading
Loading