Skip to content

Commit 85b9d00

Browse files
jorgeebentsherman
andauthored
Data lineage programmatic API (#6003)
--------- Signed-off-by: jorgee <[email protected]> Signed-off-by: Ben Sherman <[email protected]> Co-authored-by: Ben Sherman <[email protected]>
1 parent d4fadd4 commit 85b9d00

File tree

20 files changed

+541
-354
lines changed

20 files changed

+541
-354
lines changed

docs/migrations/25-04.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ The third preview of workflow outputs introduces the following breaking changes
3737

3838
See {ref}`workflow-output-def` to learn more about the workflow output definition.
3939

40+
<h3>Data lineage</h3>
41+
42+
This release introduces built-in provenance tracking, also known as *data lineage*. When `lineage.enabled` is set to `true` in your configuration, Nextflow will record every workflow run, task execution, output file, and the links between them.
43+
44+
You can explore this lineage from the command line using the {ref}`cli-lineage` command. Additionally, you can refer to files in the lineage store from a Nextflow script using the `lid://` path prefix as well as the {ref}`channel-from-lineage` channel factory.
45+
46+
See the {ref}`cli-lineage` command and {ref}`config-lineage` config scope for details.
47+
4048
## Enhancements
4149

4250
<h3>Improved <code>inspect</code> command</h3>

docs/reference/channel.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,37 @@ But when more than one argument is provided, they are always managed as *single*
5858
channel.from( [1, 2], [5,6], [7,9] )
5959
```
6060

61+
(channel-from-lineage)=
62+
63+
## fromLineage
64+
65+
:::{versionadded} 25.04.0
66+
:::
67+
68+
:::{warning} *Experimental: may change in a future release.*
69+
:::
70+
71+
The `channel.fromLineage` factory creates a channel that emits files from the {ref}`cli-lineage` store that match the given key-value params:
72+
73+
```nextflow
74+
channel
75+
.fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', labels: ['alpha', 'beta'])
76+
.view()
77+
```
78+
79+
The above snippet emits files published by the given workflow run that are labeled as `alpha` and `beta`.
80+
81+
Available options:
82+
83+
`labels`
84+
: List of labels associated with the desired files.
85+
86+
`taskRun`
87+
: LID of the task run that produced the desired files.
88+
89+
`workflowRun`
90+
: LID of the workflow run that produced the desired files.
91+
6192
(channel-fromlist)=
6293

6394
## fromList

docs/reference/cli.md

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ $ nextflow lineage SUBCOMMAND [arg ..]
703703

704704
**Description**
705705

706-
The `lineage` command is used to inspect lineage metadata.
706+
The `lineage` command is used to inspect lineage metadata. Data lineage can be enabled by setting `lineage.enabled` to `true` in your Nextflow configuration (see the {ref}`config-lineage` config scope for details).
707707

708708
**Options**
709709

@@ -720,31 +720,35 @@ TIMESTAMP RUN NAME SESSION ID
720720
2025-04-22 14:45:43 backstabbing_heyrovsky 21bc4fad-e8b8-447d-9410-388f926a711f lid://c914d714877cc5c882c55a5428b510b1
721721
```
722722

723-
View a metadata description.
723+
View a lineage record.
724724

725725
```console
726726
$ nextflow lineage view <lid>
727727
```
728728

729-
View a metadata description fragment. A fragment can be a property of a metadata description (e.g., `output` or `params`) or a set of nested properties separated by a `.` (e.g., `workflow.repository`).
729+
The output of a workflow run can be shown by appending `#output` to the workflow run LID:
730730

731731
```console
732-
$ nextflow lineage view <lid#fragment>
732+
$ nextflow lineage view lid://c914d714877cc5c882c55a5428b510b1#output
733733
```
734734

735-
Find a specific metadata description that matches a URL-like query string. The query string consists of `key=value` statements separated by `&`, where keys are defined similarly to the `fragments` used in the `view` command.
735+
:::{tip}
736+
You can use the [jq](https://jqlang.org/) command-line tool to apply further queries and transformations on the resulting lineage record.
737+
:::
738+
739+
Find all lineage records that match a set of key-value pairs:
736740

737741
```console
738-
$ nextflow lineage find "<query-string>"
742+
$ nextflow lineage find <key-1>=<value-1> <key-2>=<value-2> ...
739743
```
740744

741-
Display a git-style diff between two metadata descriptions.
745+
Display a git-style diff between two lineage records.
742746

743747
```console
744748
$ nextflow lineage diff <lid-1> <lid-2>
745749
```
746750

747-
Render the lineage graph for a workflow or task output in an HTML file. (default file path: `./lineage.html`).
751+
Render the lineage graph for a workflow or task output as an HTML file. (default file path: `./lineage.html`).
748752

749753
```console
750754
$ nextflow lineage render <lid> [html-file-path]

docs/reference/config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1123,7 +1123,7 @@ See the {ref}`k8s-page` page for more details.
11231123

11241124
## `lineage`
11251125

1126-
The `lineage` scope controls the generation of lineage metadata.
1126+
The `lineage` scope controls the generation of {ref}`cli-lineage` metadata.
11271127

11281128
The following settings are available:
11291129

modules/nextflow/src/main/groovy/nextflow/Channel.groovy

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@ import nextflow.datasource.SraExplorer
4040
import nextflow.exception.AbortOperationException
4141
import nextflow.extension.CH
4242
import nextflow.extension.GroupTupleOp
43+
import nextflow.extension.LinExtension
4344
import nextflow.extension.MapOp
4445
import nextflow.file.DirListener
4546
import nextflow.file.DirWatcher
4647
import nextflow.file.DirWatcherV2
4748
import nextflow.file.FileHelper
4849
import nextflow.file.FilePatternSplitter
4950
import nextflow.file.PathVisitor
51+
import nextflow.plugin.Plugins
5052
import nextflow.plugin.extension.PluginExtensionProvider
5153
import nextflow.util.Duration
5254
import nextflow.util.TestOnly
@@ -657,4 +659,23 @@ class Channel {
657659
fromPath0Future = future.exceptionally(Channel.&handlerException)
658660
}
659661

662+
static DataflowWriteChannel fromLineage(Map<String,?> params) {
663+
checkParams('fromLineage', params, LinExtension.PARAMS)
664+
final result = CH.create()
665+
if( NF.isDsl2() ) {
666+
session.addIgniter { fromLineage0(result, params) }
667+
}
668+
else {
669+
fromLineage0(result, params )
670+
}
671+
return result
672+
}
673+
674+
private static void fromLineage0(DataflowWriteChannel channel, Map<String,?> params) {
675+
final linExt = Plugins.getExtension(LinExtension)
676+
if( !linExt )
677+
throw new IllegalStateException("Unable to load lineage extensions.")
678+
final future = CompletableFuture.runAsync(() -> linExt.fromLineage(session, channel, params))
679+
future.exceptionally(this.&handlerException)
680+
}
660681
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2013-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.extension
18+
19+
import groovyx.gpars.dataflow.DataflowWriteChannel
20+
import nextflow.Session
21+
22+
/**
23+
* Interface for nf-lineage extensions.
24+
*
25+
* @author Jorge Ejarque <[email protected]
26+
*/
27+
interface LinExtension {
28+
29+
static final Map PARAMS = [
30+
labels: List,
31+
taskRun: [String,GString],
32+
workflowRun: [String,GString],
33+
]
34+
35+
/**
36+
* Query Lineage metadata to get files produced by tasks, workflows or annotations.
37+
*
38+
* @param session Nextflow Session
39+
* @param channel Channel to publish the Lineage Ids matching the query params
40+
* @param params Parameters for the lineage metadata query
41+
*/
42+
abstract void fromLineage(Session session, DataflowWriteChannel channel, Map<String,?> params)
43+
}

modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class CmdLineageTest extends Specification {
179179

180180
then:
181181
stdout.size() == 1
182-
stdout[0] == "Error loading lid://12345 - Lineage object 12345 not found"
182+
stdout[0] == "Error loading lid://12345 - Lineage record 12345 not found"
183183

184184
cleanup:
185185
folder?.deleteDir()
@@ -280,45 +280,10 @@ class CmdLineageTest extends Specification {
280280
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
281281
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
282282
def jsonSer = encoder.encode(entry)
283-
def expectedOutput = jsonSer
284-
lidFile.text = jsonSer
285-
when:
286-
def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
287-
lidCmd.run()
288-
def stdout = capture
289-
.toString()
290-
.readLines()// remove the log part
291-
.findResults { line -> !line.contains('DEBUG') ? line : null }
292-
.findResults { line -> !line.contains('INFO') ? line : null }
293-
.findResults { line -> !line.contains('plugin') ? line : null }
294-
295-
then:
296-
stdout.size() == expectedOutput.readLines().size()
297-
stdout.join('\n') == expectedOutput
298-
299-
cleanup:
300-
folder?.deleteDir()
301-
}
302-
303-
def 'should show query results'(){
304-
given:
305-
def folder = Files.createTempDirectory('test').toAbsolutePath()
306-
def configFile = folder.resolve('nextflow.config')
307-
configFile.text = "lineage.enabled = true\nlineage.store.location = '$folder'".toString()
308-
def lidFile = folder.resolve("12345/.data.json")
309-
Files.createDirectories(lidFile.parent)
310-
def launcher = Mock(Launcher){
311-
getOptions() >> new CliOptions(config: [configFile.toString()])
312-
}
313-
def encoder = new LinEncoder().withPrettyPrint(true)
314-
def time = OffsetDateTime.now()
315-
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
316-
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
317-
def jsonSer = encoder.encode(entry)
318-
def expectedOutput = jsonSer
283+
def expectedOutput = '[\n "lid://12345"\n]'
319284
lidFile.text = jsonSer
320285
when:
321-
def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
286+
def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput"])
322287
lidCmd.run()
323288
def stdout = capture
324289
.toString()

modules/nf-lang/src/main/java/nextflow/script/types/ChannelFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public interface ChannelFactory {
3131

3232
Channel fromFilePairs(Map<String,?> opts, String pattern, Closure grouping);
3333

34+
Channel<Path> fromLineage(Map<String,?> opts);
35+
3436
<E> Channel<E> fromList(Collection<E> values);
3537

3638
Channel<Path> fromPath(Map<String,?> opts, String pattern);

modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,7 @@ class DefaultLinStore implements LinStore {
9393
void close() throws IOException { }
9494

9595
@Override
96-
Map<String, LinSerializable> search(String queryString) {
97-
def params = null
98-
if (queryString) {
99-
params = LinUtils.parseQuery(queryString)
100-
}
101-
return searchAllFiles(params)
102-
}
103-
104-
private Map<String, LinSerializable> searchAllFiles(Map<String,List<String>> params) {
96+
Map<String, LinSerializable> search(Map<String,List<String>> params) {
10597
final results = new HashMap<String, LinSerializable>()
10698

10799
Files.walkFileTree(location, new FileVisitor<Path>() {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2013-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.lineage
18+
19+
import groovy.transform.CompileStatic
20+
import groovy.util.logging.Slf4j
21+
import groovyx.gpars.dataflow.DataflowWriteChannel
22+
import nextflow.Channel
23+
import nextflow.Session
24+
import nextflow.extension.LinExtension
25+
import nextflow.lineage.fs.LinPathFactory
26+
import nextflow.lineage.model.FileOutput
27+
import nextflow.lineage.serde.LinSerializable
28+
29+
import static nextflow.lineage.fs.LinPath.*
30+
31+
/**
32+
* Lineage channel extensions
33+
*
34+
* @author Jorge Ejarque <[email protected]>
35+
*/
36+
@CompileStatic
37+
@Slf4j
38+
class LinExtensionImpl implements LinExtension {
39+
40+
@Override
41+
void fromLineage(Session session, DataflowWriteChannel channel, Map<String,?> opts) {
42+
final queryParams = buildQueryParams(opts)
43+
log.trace("Querying lineage with params: $queryParams")
44+
new LinPropertyValidator().validateQueryParams(queryParams.keySet())
45+
final store = getStore(session)
46+
emitSearchResults(channel, store.search(queryParams))
47+
channel.bind(Channel.STOP)
48+
}
49+
50+
private static Map<String, List<String>> buildQueryParams(Map<String,?> opts) {
51+
final queryParams = [type: [FileOutput.class.simpleName] ]
52+
if( opts.workflowRun )
53+
queryParams['workflowRun'] = [opts.workflowRun as String]
54+
if( opts.taskRun )
55+
queryParams['taskRun'] = [opts.taskRun as String]
56+
if( opts.labels )
57+
queryParams['labels'] = opts.labels as List<String>
58+
return queryParams
59+
}
60+
61+
protected LinStore getStore(Session session) {
62+
final store = LinStoreFactory.getOrCreate(session)
63+
if( !store ) {
64+
throw new Exception("Lineage store not found - Check Nextflow configuration")
65+
}
66+
return store
67+
}
68+
69+
private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
70+
if( !results ) {
71+
return
72+
}
73+
results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
74+
}
75+
}

modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ interface LinStore extends Closeable {
5555

5656
/**
5757
* Search for lineage entries.
58-
* @queryString Json-path like query string. (Only simple and nested field operators are supported(No array, wildcards,etc.)
59-
* @return Key-lineage entry pairs fulfilling the queryString
58+
* @param params Map of query params
59+
* @return Key-lineage entry pairs fulfilling the query params
6060
*/
61-
Map<String,LinSerializable> search(String queryString)
61+
Map<String,LinSerializable> search(Map<String, List<String>> params)
6262

6363
}

0 commit comments

Comments
 (0)