diff --git a/docs/migrations/25-04.md b/docs/migrations/25-04.md
index 205ed72e09..00eea9829b 100644
--- a/docs/migrations/25-04.md
+++ b/docs/migrations/25-04.md
@@ -37,6 +37,14 @@ The third preview of workflow outputs introduces the following breaking changes
See {ref}`workflow-output-def` to learn more about the workflow output definition.
+
Data lineage
+
+This release introduces built-in provenance tracking, also known as *data lineage*. When `lineage.enabled` is set to `true` in your configuration, Nextflow will record every workflow run, task execution, output file, and the links between them.
+
+You can explore this lineage from the command line using the {ref}`cli-lineage` command. Additionally, you can refer to files in the lineage store from a Nextflow script using the `lid://` path prefix as well as the {ref}`channel-from-lineage` channel factory.
+
+See the {ref}`cli-lineage` command and {ref}`config-lineage` config scope for details.
+
## Enhancements
Improved inspect command
diff --git a/docs/reference/channel.md b/docs/reference/channel.md
index 5d3b54c95f..fc824512d1 100644
--- a/docs/reference/channel.md
+++ b/docs/reference/channel.md
@@ -58,6 +58,37 @@ But when more than one argument is provided, they are always managed as *single*
channel.from( [1, 2], [5,6], [7,9] )
```
+(channel-from-lineage)=
+
+## fromLineage
+
+:::{versionadded} 25.04.0
+:::
+
+:::{warning} *Experimental: may change in a future release.*
+:::
+
+The `channel.fromLineage` factory creates a channel that emits files from the {ref}`cli-lineage` store that match the given key-value params:
+
+```nextflow
+channel
+ .fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', labels: ['alpha', 'beta'])
+ .view()
+```
+
+The above snippet emits files published by the given workflow run that are labeled as `alpha` and `beta`.
+
+Available options:
+
+`labels`
+: List of labels associated with the desired files.
+
+`taskRun`
+: LID of the task run that produced the desired files.
+
+`workflowRun`
+: LID of the workflow run that produced the desired files.
+
(channel-fromlist)=
## fromList
diff --git a/docs/reference/cli.md b/docs/reference/cli.md
index 67cc43d104..88db1aebf4 100644
--- a/docs/reference/cli.md
+++ b/docs/reference/cli.md
@@ -703,7 +703,7 @@ $ nextflow lineage SUBCOMMAND [arg ..]
**Description**
-The `lineage` command is used to inspect lineage metadata.
+The `lineage` command is used to inspect lineage metadata. Data lineage can be enabled by setting `lineage.enabled` to `true` in your Nextflow configuration (see the {ref}`config-lineage` config scope for details).
**Options**
@@ -720,31 +720,35 @@ TIMESTAMP RUN NAME SESSION ID
2025-04-22 14:45:43 backstabbing_heyrovsky 21bc4fad-e8b8-447d-9410-388f926a711f lid://c914d714877cc5c882c55a5428b510b1
```
-View a metadata description.
+View a lineage record.
```console
$ nextflow lineage view
```
-View a metadata description fragment. A fragment can be a property of a metadata description (e.g., `output` or `params`) or a set of nested properties separated by a `.` (e.g., `workflow.repository`).
+The output of a workflow run can be shown by appending `#output` to the workflow run LID:
```console
-$ nextflow lineage view
+$ nextflow lineage view lid://c914d714877cc5c882c55a5428b510b1#output
```
-Find a specific metadata description that matches a URL-like query string. The query string consists of `key=value` statements separated by `&`, where keys are defined similarly to the `fragments` used in the `view` command.
+:::{tip}
+You can use the [jq](https://jqlang.org/) command-line tool to apply further queries and transformations on the resulting lineage record.
+:::
+
+Find all lineage records that match a set of key-value pairs:
```console
-$ nextflow lineage find ""
+$ nextflow lineage find == ...
```
-Display a git-style diff between two metadata descriptions.
+Display a git-style diff between two lineage records.
```console
$ nextflow lineage diff
```
-Render the lineage graph for a workflow or task output in an HTML file. (default file path: `./lineage.html`).
+Render the lineage graph for a workflow or task output as an HTML file. (default file path: `./lineage.html`).
```console
$ nextflow lineage render [html-file-path]
diff --git a/docs/reference/config.md b/docs/reference/config.md
index 4dd1941524..bbac6272d3 100644
--- a/docs/reference/config.md
+++ b/docs/reference/config.md
@@ -1123,7 +1123,7 @@ See the {ref}`k8s-page` page for more details.
## `lineage`
-The `lineage` scope controls the generation of lineage metadata.
+The `lineage` scope controls the generation of {ref}`cli-lineage` metadata.
The following settings are available:
diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy
index ee1bb55f43..3d3ff69b58 100644
--- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy
+++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy
@@ -40,6 +40,7 @@ import nextflow.datasource.SraExplorer
import nextflow.exception.AbortOperationException
import nextflow.extension.CH
import nextflow.extension.GroupTupleOp
+import nextflow.extension.LinExtension
import nextflow.extension.MapOp
import nextflow.file.DirListener
import nextflow.file.DirWatcher
@@ -47,6 +48,7 @@ import nextflow.file.DirWatcherV2
import nextflow.file.FileHelper
import nextflow.file.FilePatternSplitter
import nextflow.file.PathVisitor
+import nextflow.plugin.Plugins
import nextflow.plugin.extension.PluginExtensionProvider
import nextflow.util.Duration
import nextflow.util.TestOnly
@@ -657,4 +659,23 @@ class Channel {
fromPath0Future = future.exceptionally(Channel.&handlerException)
}
+ static DataflowWriteChannel fromLineage(Map params) {
+ checkParams('fromLineage', params, LinExtension.PARAMS)
+ final result = CH.create()
+ if( NF.isDsl2() ) {
+ session.addIgniter { fromLineage0(result, params) }
+ }
+ else {
+ fromLineage0(result, params )
+ }
+ return result
+ }
+
+ private static void fromLineage0(DataflowWriteChannel channel, Map params) {
+ final linExt = Plugins.getExtension(LinExtension)
+ if( !linExt )
+ throw new IllegalStateException("Unable to load lineage extensions.")
+ final future = CompletableFuture.runAsync(() -> linExt.fromLineage(session, channel, params))
+ future.exceptionally(this.&handlerException)
+ }
}
diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy
new file mode 100644
index 0000000000..d65010131c
--- /dev/null
+++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2013-2025, Seqera Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nextflow.extension
+
+import groovyx.gpars.dataflow.DataflowWriteChannel
+import nextflow.Session
+
+/**
+ * Interface for nf-lineage extensions.
+ *
+ * @author Jorge Ejarque params)
+}
diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy
index 4db9155246..da7787cb59 100644
--- a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy
+++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy
@@ -179,7 +179,7 @@ class CmdLineageTest extends Specification {
then:
stdout.size() == 1
- stdout[0] == "Error loading lid://12345 - Lineage object 12345 not found"
+ stdout[0] == "Error loading lid://12345 - Lineage record 12345 not found"
cleanup:
folder?.deleteDir()
@@ -280,45 +280,10 @@ class CmdLineageTest extends Specification {
def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
- def expectedOutput = jsonSer
- lidFile.text = jsonSer
- when:
- def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
- lidCmd.run()
- def stdout = capture
- .toString()
- .readLines()// remove the log part
- .findResults { line -> !line.contains('DEBUG') ? line : null }
- .findResults { line -> !line.contains('INFO') ? line : null }
- .findResults { line -> !line.contains('plugin') ? line : null }
-
- then:
- stdout.size() == expectedOutput.readLines().size()
- stdout.join('\n') == expectedOutput
-
- cleanup:
- folder?.deleteDir()
- }
-
- def 'should show query results'(){
- given:
- def folder = Files.createTempDirectory('test').toAbsolutePath()
- def configFile = folder.resolve('nextflow.config')
- configFile.text = "lineage.enabled = true\nlineage.store.location = '$folder'".toString()
- def lidFile = folder.resolve("12345/.data.json")
- Files.createDirectories(lidFile.parent)
- def launcher = Mock(Launcher){
- getOptions() >> new CliOptions(config: [configFile.toString()])
- }
- def encoder = new LinEncoder().withPrettyPrint(true)
- def time = OffsetDateTime.now()
- def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
- "lid://123987/file.bam", "lid://12345", "lid://123987/", 1234, time, time, null)
- def jsonSer = encoder.encode(entry)
- def expectedOutput = jsonSer
+ def expectedOutput = '[\n "lid://12345"\n]'
lidFile.text = jsonSer
when:
- def lidCmd = new CmdLineage(launcher: launcher, args: ["view", "lid:///?type=FileOutput"])
+ def lidCmd = new CmdLineage(launcher: launcher, args: ["find", "type=FileOutput"])
lidCmd.run()
def stdout = capture
.toString()
diff --git a/modules/nf-lang/src/main/java/nextflow/script/types/ChannelFactory.java b/modules/nf-lang/src/main/java/nextflow/script/types/ChannelFactory.java
index 826d1fcb39..b9dd747de3 100644
--- a/modules/nf-lang/src/main/java/nextflow/script/types/ChannelFactory.java
+++ b/modules/nf-lang/src/main/java/nextflow/script/types/ChannelFactory.java
@@ -31,6 +31,8 @@ public interface ChannelFactory {
Channel fromFilePairs(Map opts, String pattern, Closure grouping);
+ Channel fromLineage(Map opts);
+
Channel fromList(Collection values);
Channel fromPath(Map opts, String pattern);
diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy
index 3ebc1b4ff5..6b1f0dd6fe 100644
--- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy
+++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy
@@ -93,15 +93,7 @@ class DefaultLinStore implements LinStore {
void close() throws IOException { }
@Override
- Map search(String queryString) {
- def params = null
- if (queryString) {
- params = LinUtils.parseQuery(queryString)
- }
- return searchAllFiles(params)
- }
-
- private Map searchAllFiles(Map> params) {
+ Map search(Map> params) {
final results = new HashMap()
Files.walkFileTree(location, new FileVisitor() {
diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy
new file mode 100644
index 0000000000..ac92fe817c
--- /dev/null
+++ b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2013-2025, Seqera Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nextflow.lineage
+
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import groovyx.gpars.dataflow.DataflowWriteChannel
+import nextflow.Channel
+import nextflow.Session
+import nextflow.extension.LinExtension
+import nextflow.lineage.fs.LinPathFactory
+import nextflow.lineage.model.FileOutput
+import nextflow.lineage.serde.LinSerializable
+
+import static nextflow.lineage.fs.LinPath.*
+
+/**
+ * Lineage channel extensions
+ *
+ * @author Jorge Ejarque
+ */
+@CompileStatic
+@Slf4j
+class LinExtensionImpl implements LinExtension {
+
+ @Override
+ void fromLineage(Session session, DataflowWriteChannel channel, Map opts) {
+ final queryParams = buildQueryParams(opts)
+ log.trace("Querying lineage with params: $queryParams")
+ new LinPropertyValidator().validateQueryParams(queryParams.keySet())
+ final store = getStore(session)
+ emitSearchResults(channel, store.search(queryParams))
+ channel.bind(Channel.STOP)
+ }
+
+ private static Map> buildQueryParams(Map opts) {
+ final queryParams = [type: [FileOutput.class.simpleName] ]
+ if( opts.workflowRun )
+ queryParams['workflowRun'] = [opts.workflowRun as String]
+ if( opts.taskRun )
+ queryParams['taskRun'] = [opts.taskRun as String]
+ if( opts.labels )
+ queryParams['labels'] = opts.labels as List
+ return queryParams
+ }
+
+ protected LinStore getStore(Session session) {
+ final store = LinStoreFactory.getOrCreate(session)
+ if( !store ) {
+ throw new Exception("Lineage store not found - Check Nextflow configuration")
+ }
+ return store
+ }
+
+ private void emitSearchResults(DataflowWriteChannel channel, Map results) {
+ if( !results ) {
+ return
+ }
+ results.keySet().forEach { channel.bind( LinPathFactory.create( asUriString(it) ) ) }
+ }
+}
diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
index 3f826b7a0a..2dc6974f6e 100644
--- a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
+++ b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
@@ -55,9 +55,9 @@ interface LinStore extends Closeable {
/**
* Search for lineage entries.
- * @queryString Json-path like query string. (Only simple and nested field operators are supported(No array, wildcards,etc.)
- * @return Key-lineage entry pairs fulfilling the queryString
+ * @param params Map of query params
+ * @return Key-lineage entry pairs fulfilling the query params
*/
- Map search(String queryString)
+ Map search(Map> params)
}
diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy
index 8f3f3c34f8..d1129745b5 100644
--- a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy
+++ b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy
@@ -16,13 +16,15 @@
package nextflow.lineage
+import static nextflow.lineage.fs.LinFileSystemProvider.*
+import static nextflow.lineage.fs.LinPath.*
+
import java.nio.file.attribute.FileTime
import java.time.OffsetDateTime
import java.time.ZoneId
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
-import nextflow.lineage.fs.LinPath
import nextflow.lineage.model.TaskRun
import nextflow.lineage.model.WorkflowRun
import nextflow.lineage.serde.LinEncoder
@@ -40,45 +42,37 @@ class LinUtils {
private static final String[] EMPTY_ARRAY = new String[] {}
/**
- * Query a lineage store.
+ * Get a lineage record or fragment from the Lineage store.
*
- * @param store lineage store to query.
- * @param uri Query to perform in a URI-like format.
- * Format 'lid://[?QueryString][#fragment]' where:
- * - Key: Element where the query will be applied. '/' indicates query will be applied in all the elements of the lineage store.
- * - QueryString: all param-value pairs that the lineage element should fulfill in a URI's query string format.
+ * @param store Lineage store.
+ * @param uri Object or fragment to retrieve in URI-like format.
+ * Format 'lid://[#fragment]' where:
+ * - Key: Metadata Element key
* - Fragment: Element fragment to retrieve.
- * @return Collection of object fulfilling the query
+ * @return Lineage record or fragment.
*/
- static Collection query(LinStore store, URI uri) {
- String key = uri.authority ? uri.authority + uri.path : uri.path
- if (key == LinPath.SEPARATOR) {
- return globalSearch(store, uri)
- } else {
- final parameters = uri.query ? parseQuery(uri.query) : null
- final children = parseChildrenFromFragment(uri.fragment)
- return searchPath(store, key, parameters, children )
- }
- }
+ static Object getMetadataObject(LinStore store, URI uri) {
+ if( uri.scheme != SCHEME )
+ throw new IllegalArgumentException("Invalid LID URI - scheme is different for $SCHEME")
+ final key = uri.authority ? uri.authority + uri.path : uri.path
+ if( key == SEPARATOR )
+ throw new IllegalArgumentException("Cannot get record from the root LID URI")
+ if ( uri.query )
+ log.warn("Query string is not supported for Lineage URI: `$uri` -- it will be ignored")
- private static Collection globalSearch(LinStore store, URI uri) {
- final results = store.search(uri.query).values()
- if (results && uri.fragment) {
- // If fragment is defined get the property of the object indicated by the fragment
- return filterResults(results, uri.fragment)
- }
- return results
+ final children = parseChildrenFromFragment(uri.fragment)
+ return getMetadataObject0(store, key, children )
}
- private static List filterResults(Collection results, String fragment) {
- final filteredResults = []
- results.forEach {
- final output = navigate(it, fragment)
- if (output) {
- filteredResults.add(output)
- }
+ private static Object getMetadataObject0(LinStore store, String key, String[] children = []) {
+ final record = store.load(key)
+ if (!record) {
+ throw new FileNotFoundException("Lineage record $key not found")
+ }
+ if (children && children.size() > 0) {
+ return getSubObject(store, key, record, children)
}
- return filteredResults
+ return record
}
/**
@@ -96,127 +90,67 @@ class LinUtils {
}
/**
- * Search for objects inside a description
+ * Get a lineage sub-record.
*
- * @param store lineage store
- * @param key lineage key where to perform the search
- * @param params Parameter-value pairs to be evaluated in the key
- * @param children Sub-objects to evaluate and retrieve
- * @return List of object
- */
- protected static List