Skip to content

Commit 0193cfe

Browse files
committed
Replace #output fragment with WorkflowLaunch/WorkflowRun
Signed-off-by: Ben Sherman <[email protected]>
1 parent cdb13af commit 0193cfe

30 files changed

+314
-625
lines changed

docs/reference/channel.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ The `channel.fromLineage` factory creates a channel that emits files from the {r
7272

7373
```nextflow
7474
channel
75-
.fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta'])
75+
.fromLineage(workflowLaunch: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta'])
7676
.view()
7777
```
7878

@@ -86,7 +86,7 @@ Available options:
8686
`taskRun`
8787
: LID of the task run that produced the desired files.
8888

89-
`workflowRun`
89+
`workflowLaunch`
9090
: LID of the workflow run that produced the desired files.
9191

9292
(channel-fromlist)=

modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ interface LinExtension {
2929
static final Map PARAMS = [
3030
label: [List,String,GString],
3131
taskRun: [String,GString],
32-
workflowRun: [String,GString],
32+
workflowLaunch: [String,GString],
3333
]
3434

3535
/**

modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ class CmdLineageTest extends Specification {
7474
def launcher = Mock(Launcher){
7575
getOptions() >> new CliOptions(config: [configFile.toString()])
7676
}
77-
lidLog.write("run_name", uniqueId, "lid://123456", date)
78-
def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456".toString()
77+
lidLog.write("run_name", uniqueId, "lid://123456","lid://567890", date)
78+
def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456\tlid://567890".toString()
7979
when:
8080
def lidCmd = new CmdLineage(launcher: launcher, args: ["list"])
8181
lidCmd.run()

modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,34 @@ class DefaultLinHistoryLog implements LinHistoryLog {
4040
Files.createDirectories(path)
4141
}
4242

43-
void write(String name, UUID key, String runLid, Date date = null) {
43+
@Override
44+
void write(String name, UUID id, String launchLid, Date date = null) {
4445
assert name
45-
assert key
46-
def timestamp = date ?: new Date()
47-
final recordFile = path.resolve(runLid.substring(LID_PROT.size()))
46+
assert id
47+
final timestamp = date ?: new Date()
48+
final recordFile = path.resolve(launchId.substring(LID_PROT.size()))
4849
try {
49-
recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString()
50-
log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}")
50+
recordFile.text = new LinHistoryRecord(timestamp, name, id, null, launchLid, null).toString()
51+
log.trace("Record for $launchLid written in lineage history log ${FilesEx.toUriString(this.path)}")
5152
}catch (Throwable e) {
52-
log.warn("Can't write record $key file ${FilesEx.toUriString(recordFile)}", e.message)
53+
log.warn("Can't write record $launchLid file ${FilesEx.toUriString(recordFile)}", e.message)
5354
}
5455
}
5556

57+
@Override
58+
void finalize(String launchLid, String runLid, String status) {
59+
assert id
60+
final recordFile = path.resolve(launchId.substring(LID_PROT.size()))
61+
try {
62+
final current = LinHistoryRecord.parse(recordFile.text)
63+
recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, status, current.launchLid, runLid).toString()
64+
}
65+
catch (Throwable e) {
66+
log.warn("Can't read record $launchId file: ${FilesEx.toUriString(recordFile)}", e.message)
67+
}
68+
}
69+
70+
@Override
5671
List<LinHistoryRecord> getRecords(){
5772
List<LinHistoryRecord> list = new LinkedList<LinHistoryRecord>()
5873
try {

modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ class LinExtensionImpl implements LinExtension {
4949

5050
private static Map<String, List<String>> buildQueryParams(Map<String,?> opts) {
5151
final queryParams = [type: [FileOutput.class.simpleName] ]
52-
if( opts.workflowRun )
53-
queryParams['workflowRun'] = [opts.workflowRun as String]
52+
if( opts.workflowLaunch )
53+
queryParams['workflowLaunch'] = [opts.workflowLaunch as String]
5454
if( opts.taskRun )
5555
queryParams['taskRun'] = [opts.taskRun as String]
5656
if( opts.label ) {

modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,18 @@ interface LinHistoryLog {
2626
*
2727
* @param name Workflow execution name.
2828
* @param sessionId Workflow session ID.
29-
* @param runLid Workflow run ID.
29+
* @param launchLid Workflow launch Lineage ID.
3030
*/
31-
void write(String name, UUID sessionId, String runLid)
31+
void write(String name, UUID sessionId, String launchLid)
32+
33+
/**
34+
* Finalize the log record for a given run.
35+
*
36+
* @param launchLid Workflow launch Lineage ID.
37+
* @param runLid Workflow run Lineage ID.
38+
* @param status Workflow run completion status.
39+
*/
40+
void finalize(String launchLid, String runLid, String status)
3241

3342
/**
3443
* Get the store records in the Lineage History Log.

modules/nf-lineage/src/main/nextflow/lineage/LinHistoryRecord.groovy

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,16 @@ class LinHistoryRecord {
3535
final Date timestamp
3636
final String runName
3737
final UUID sessionId
38+
final String status
39+
final String launchLid
3840
final String runLid
3941

40-
LinHistoryRecord(Date timestamp, String name, UUID sessionId, String runLid) {
42+
LinHistoryRecord(Date timestamp, String name, UUID sessionId, String status, String launchLid, String runLid) {
4143
this.timestamp = timestamp
4244
this.runName = name
4345
this.sessionId = sessionId
46+
this.status = status
47+
this.launchLid = launchLid
4448
this.runLid = runLid
4549
}
4650

@@ -51,6 +55,8 @@ class LinHistoryRecord {
5155
timestamp ? TIMESTAMP_FMT.format(timestamp) : '-',
5256
runName ?: '-',
5357
sessionId.toString(),
58+
status ?: '-',
59+
launchLid ?: '-',
5460
runLid ?: '-',
5561
)
5662
}
@@ -62,8 +68,8 @@ class LinHistoryRecord {
6268

6369
static LinHistoryRecord parse(String line) {
6470
final cols = line.tokenize('\t')
65-
if (cols.size() == 4) {
66-
return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3])
71+
if (cols.size() == 6) {
72+
return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3], cols[4], cols[5])
6773
}
6874
throw new IllegalArgumentException("Not a valid history entry: `$line`")
6975
}

modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ import nextflow.lineage.model.Checksum
3232
import nextflow.lineage.model.FileOutput
3333
import nextflow.lineage.model.DataPath
3434
import nextflow.lineage.model.Parameter
35-
import nextflow.lineage.model.TaskOutput
3635
import nextflow.lineage.model.Workflow
37-
import nextflow.lineage.model.WorkflowOutput
36+
import nextflow.lineage.model.WorkflowLaunch
3837
import nextflow.lineage.model.WorkflowRun
3938
import nextflow.file.FileHelper
4039
import nextflow.file.FileHolder
@@ -84,10 +83,10 @@ class LinObserver implements TraceObserverV2 {
8483
(EachInParam) : "each"
8584
]
8685

87-
private String executionHash
86+
private String launchId
8887
private LinStore store
8988
private Session session
90-
private WorkflowOutput workflowOutput
89+
private List<Parameter> outputs = new LinkedList<Parameter>()
9190
private Map<String,String> outputsStoreDirLid = new HashMap<String,String>(10)
9291
private PathNormalizer normalizer
9392

@@ -97,34 +96,32 @@ class LinObserver implements TraceObserverV2 {
9796
}
9897

9998
@TestOnly
100-
String getExecutionHash(){ executionHash }
101-
102-
@TestOnly
103-
String setExecutionHash(String hash){ this.executionHash = hash }
99+
String setLaunchId(String hash){ this.launchId = hash }
104100

105101
@TestOnly
106102
String setNormalizer(PathNormalizer normalizer){ this.normalizer = normalizer }
107103

108104
@Override
109105
void onFlowBegin() {
110106
normalizer = new PathNormalizer(session.workflowMetadata)
111-
executionHash = storeWorkflowRun(normalizer)
112-
final executionUri = asUriString(executionHash)
113-
workflowOutput = new WorkflowOutput(
114-
OffsetDateTime.now(),
115-
executionUri,
116-
new LinkedList<Parameter>()
117-
)
118-
this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri)
107+
launchId = storeWorkflowLaunch(normalizer)
108+
this.store.getHistoryLog().write(session.runName, session.uniqueId, asUriString(launchId))
119109
}
120110

121111
@Override
122-
void onFlowComplete(){
123-
if(workflowOutput?.output ){
124-
workflowOutput.createdAt = OffsetDateTime.now()
125-
final key = executionHash + '#output'
126-
this.store.save(key, workflowOutput)
127-
}
112+
void onFlowComplete() {
113+
final status = session.isCancelled()
114+
? "CANCELLED"
115+
: session.isSuccess() ? "SUCCEEDED" : "FAILED"
116+
final workflowRun = new WorkflowRun(
117+
OffsetDateTime.now(),
118+
asUriString(launchId),
119+
status,
120+
outputs
121+
)
122+
final runId = CacheHelper.hasher(workflowRun).hash().toString()
123+
this.store.save(runId, workflowRun)
124+
this.store.getHistoryLog().finalize(session.uniqueId, asUriString(runId), status)
128125
}
129126

130127
protected Collection<Path> allScriptFiles() {
@@ -150,24 +147,24 @@ class LinObserver implements TraceObserverV2 {
150147
return result.sort{it.path}
151148
}
152149

153-
protected String storeWorkflowRun(PathNormalizer normalizer) {
150+
protected String storeWorkflowLaunch(PathNormalizer normalizer) {
154151
// create the workflow object holding script files and repo tracking info
155152
final workflow = new Workflow(
156153
collectScriptDataPaths(normalizer),
157154
session.workflowMetadata.repository,
158155
session.workflowMetadata.commitId
159156
)
160157
// create the workflow run main object
161-
final value = new WorkflowRun(
158+
final value = new WorkflowLaunch(
162159
workflow,
163160
session.uniqueId.toString(),
164161
session.runName,
165162
getNormalizedParams(session.params, normalizer),
166163
SecretHelper.hideSecrets(session.config.deepClone()) as Map
167164
)
168-
final executionHash = CacheHelper.hasher(value).hash().toString()
169-
store.save(executionHash, value)
170-
return executionHash
165+
final launchId = CacheHelper.hasher(value).hash().toString()
166+
store.save(launchId, value)
167+
return launchId
171168
}
172169

173170
protected static List<Parameter> getNormalizedParams(Map<String, Object> params, PathNormalizer normalizer){
@@ -180,43 +177,6 @@ class LinObserver implements TraceObserverV2 {
180177
return normalizedParams
181178
}
182179

183-
@Override
184-
void onTaskComplete(TaskEvent event) {
185-
storeTaskInfo(event.handler.task)
186-
}
187-
188-
protected void storeTaskInfo(TaskRun task) {
189-
// store the task run entry
190-
storeTaskRun(task, normalizer)
191-
// store all task results
192-
storeTaskResults(task, normalizer)
193-
}
194-
195-
protected String storeTaskResults(TaskRun task, PathNormalizer normalizer){
196-
final outputParams = getNormalizedTaskOutputs(task, normalizer)
197-
final value = new TaskOutput( asUriString(task.hash.toString()), asUriString(executionHash), OffsetDateTime.now(), outputParams )
198-
final key = task.hash.toString() + '#output'
199-
store.save(key,value)
200-
return key
201-
}
202-
203-
private List<Parameter> getNormalizedTaskOutputs(TaskRun task, PathNormalizer normalizer){
204-
final outputs = task.getOutputs()
205-
final outputParams = new LinkedList<Parameter>()
206-
for( Map.Entry<OutParam,Object> entry : outputs ) {
207-
manageTaskOutputParameter(entry.key, outputParams, entry.value, task, normalizer)
208-
}
209-
return outputParams
210-
}
211-
212-
private void manageTaskOutputParameter(OutParam key, LinkedList<Parameter> outputParams, value, TaskRun task, PathNormalizer normalizer) {
213-
if (key instanceof FileOutParam) {
214-
outputParams.add(new Parameter(getParameterType(key), key.name, manageFileOutParam(value, task)))
215-
} else {
216-
outputParams.add(new Parameter(getParameterType(key), key.name, normalizeValue(value, normalizer)))
217-
}
218-
}
219-
220180
private static Object normalizeValue(Object value, PathNormalizer normalizer) {
221181
if (value instanceof Path)
222182
return normalizer.normalizePath((Path)value)
@@ -226,6 +186,11 @@ class LinObserver implements TraceObserverV2 {
226186
return value
227187
}
228188

189+
@Override
190+
void onTaskComplete(TaskEvent event) {
191+
storeTaskRun(event.handler.task, normalizer)
192+
}
193+
229194
private Object manageFileOutParam(Object value, TaskRun task) {
230195
if (value == null) {
231196
log.debug "Unexpected lineage File output value null"
@@ -247,7 +212,7 @@ class LinObserver implements TraceObserverV2 {
247212

248213
protected String storeTaskRun(TaskRun task, PathNormalizer normalizer) {
249214
final codeChecksum = Checksum.ofNextflow(session.stubRun ? task.stubSource : task.source)
250-
final value = new nextflow.lineage.model.TaskRun(
215+
final taskRun = new nextflow.lineage.model.TaskRun(
251216
session.uniqueId.toString(),
252217
task.getName(),
253218
codeChecksum,
@@ -262,12 +227,19 @@ class LinObserver implements TraceObserverV2 {
262227
normalizer.normalizePath(p.normalize()),
263228
Checksum.ofNextflow(p) )
264229
},
265-
asUriString(executionHash)
230+
asUriString(launchId)
266231
)
267232

268233
// store in the underlying persistence
269234
final key = task.hash.toString()
270-
store.save(key, value)
235+
store.save(key, taskRun)
236+
237+
// store file outputs
238+
task.outputs.forEach { OutParam param, Object value ->
239+
if (param instanceof FileOutParam)
240+
manageFileOutParam(value, task)
241+
}
242+
271243
return key
272244
}
273245

@@ -280,7 +252,7 @@ class LinObserver implements TraceObserverV2 {
280252
path.toUriString(),
281253
checksum,
282254
asUriString(task.hash.toString()),
283-
asUriString(executionHash),
255+
asUriString(launchId),
284256
asUriString(task.hash.toString()),
285257
attrs.size(),
286258
LinUtils.toDate(attrs?.creationTime()),
@@ -300,7 +272,7 @@ class LinObserver implements TraceObserverV2 {
300272

301273
protected String getWorkflowOutputKey(Path target) {
302274
final rel = getWorkflowRelative(target)
303-
return executionHash + SEPARATOR + rel
275+
return launchId + SEPARATOR + rel
304276
}
305277

306278
protected String getTaskRelative(TaskRun task, Path path){
@@ -345,13 +317,13 @@ class LinObserver implements TraceObserverV2 {
345317
final key = getWorkflowOutputKey(event.target)
346318
final sourceReference = event.source
347319
? getSourceReference(event.source)
348-
: asUriString(executionHash)
320+
: asUriString(launchId)
349321
final attrs = readAttributes(event.target)
350322
final value = new FileOutput(
351323
event.target.toUriString(),
352324
checksum,
353325
sourceReference,
354-
asUriString(executionHash),
326+
asUriString(launchId),
355327
null,
356328
attrs.size(),
357329
LinUtils.toDate(attrs?.creationTime()),
@@ -363,7 +335,7 @@ class LinObserver implements TraceObserverV2 {
363335
log.warn1("Lineage for workflow output is not supported by publishDir directive")
364336
}
365337
catch (Throwable e) {
366-
log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${executionHash}'", e)
338+
log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${launchId}'", e)
367339
}
368340
}
369341

@@ -381,7 +353,7 @@ class LinObserver implements TraceObserverV2 {
381353
void onWorkflowOutput(WorkflowOutputEvent event) {
382354
final type = getParameterType(event.value)
383355
final value = convertPathsToLidReferences(event.index ?: event.value)
384-
workflowOutput.output.add(new Parameter(type, event.name, value))
356+
outputs.add(new Parameter(type, event.name, value))
385357
}
386358

387359
protected static String getParameterType(Object param) {

0 commit comments

Comments
 (0)