Skip to content

Refactor lineage JSON serialization with a root element including version, kind and spec properties #6075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@ public RuntimeTypeAdapterFactory<T> registerSubtype(Class<? extends T> type) {
return registerSubtype(type, type.getSimpleName());
}

protected Class<?> getSubTypeFromLabel(String label){
return labelToSubtype.get(label);
}

protected String getLabelFromSubtype(Class<?> subType){
return subtypeToLabel.get(subType);
}

protected String getTypeFieldName(){
return typeFieldName;
}


@Override
public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
if (type == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ import nextflow.serde.gson.RuntimeTypeAdapterFactory
@CompileStatic
class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {
public static final String VERSION_FIELD = 'version'
public static final String SPEC_FIELD = 'spec'
public static final String CURRENT_VERSION = LinModel.VERSION

private labelToClass = [:]
LinTypeAdapterFactory() {
super(LinSerializable.class, "kind", false)
this.registerSubtype(WorkflowRun, WorkflowRun.simpleName)
Expand All @@ -70,39 +72,36 @@ class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {
return new TypeAdapter<R>() {
@Override
void write(JsonWriter out, R value) throws IOException {
def json = delegate.toJsonTree(value)
if (json instanceof JsonObject) {
json = addVersion(json)
}
gson.toJson(json, out)
final object = new JsonObject()
object.addProperty(VERSION_FIELD, CURRENT_VERSION)
String label = getLabelFromSubtype(value.class)
if (!label)
throw new JsonParseException("Not registered class ${value.class}")
object.addProperty(getTypeFieldName(), label)
def json = gson.toJsonTree(value)
object.add(SPEC_FIELD, json)
gson.toJson(object, out)
}

@Override
R read(JsonReader reader) throws IOException {
def json = JsonParser.parseReader(reader)
if (json instanceof JsonObject) {
def obj = (JsonObject) json
def versionEl = obj.get(VERSION_FIELD)
if (versionEl == null || versionEl.asString != CURRENT_VERSION) {
throw new JsonParseException("Invalid or missing version")
}
obj.remove(VERSION_FIELD)
def obj = JsonParser.parseReader(reader)?.asJsonObject
if( !obj )
throw new JsonParseException("Parsed object is null")
def versionEl = obj.get(VERSION_FIELD)
if (versionEl == null || versionEl.asString != CURRENT_VERSION) {
throw new JsonParseException("Invalid or missing '${VERSION_FIELD}' property")
}
return delegate.fromJsonTree(json)
final typeEl = obj.get(getTypeFieldName())
if( !typeEl )
throw new JsonParseException("'${getTypeFieldName()}' not found")
final specEl = obj.get(SPEC_FIELD)?.asJsonObject
if ( !specEl )
throw new JsonParseException("'Invalid or missing '${SPEC_FIELD}' property")
specEl.add(getTypeFieldName(), typeEl)
return delegate.fromJsonTree(specEl)
}
}
}

private static JsonObject addVersion(JsonObject json){
if( json.has(VERSION_FIELD) )
throw new JsonParseException("object already defines a field named ${VERSION_FIELD}")

JsonObject clone = new JsonObject();
clone.addProperty(VERSION_FIELD, CURRENT_VERSION)
for (Map.Entry<String, JsonElement> e : json.entrySet()) {
clone.add(e.getKey(), e.getValue());
}
return clone
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,27 @@ class LinCommandImplTest extends Specification{
def expectedOutput = '''diff --git 12345 67890
--- 12345
+++ 67890
@@ -1,16 +1,16 @@
{
@@ -2,16 +2,16 @@
"version": "lineage/v1beta1",
"kind": "FileOutput",
- "path": "path/to/file",
+ "path": "path/to/file2",
"checksum": {
- "value": "45372qe",
+ "value": "42472qet",
"algorithm": "nextflow",
"mode": "standard"
},
- "source": "lid://123987/file.bam",
+ "source": "lid://123987/file2.bam",
"workflowRun": "lid://123987/",
"taskRun": null,
- "size": 1234,
+ "size": 1235,
"createdAt": "1970-01-02T10:17:36.789Z",
"modifiedAt": "1970-01-02T10:17:36.789Z",
"labels": null
"spec": {
- "path": "path/to/file",
+ "path": "path/to/file2",
"checksum": {
- "value": "45372qe",
+ "value": "42472qet",
"algorithm": "nextflow",
"mode": "standard"
},
- "source": "lid://123987/file.bam",
+ "source": "lid://123987/file2.bam",
"workflowRun": "lid://123987/",
"taskRun": null,
- "size": 1234,
+ "size": 1235,
"createdAt": "1970-01-02T10:17:36.789Z",
"modifiedAt": "1970-01-02T10:17:36.789Z",
"labels": null
'''

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"'+output.toString()+'"}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -179,7 +179,7 @@ class LinFileSystemProviderTest extends Specification {
def config = [lineage:[store:[location:wdir.toString()]]]
def outputMeta = wdir.resolve("12345")
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"WorkflowRun","sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"WorkflowRun","spec":{"sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -238,7 +238,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"'+output.toString()+'"}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -278,8 +278,8 @@ class LinFileSystemProviderTest extends Specification {
output1.resolve('file3.txt').text = 'file3'
wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/output2').mkdirs()
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + output1.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun","spec":{"name":"dummy"}}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + output1.toString() + '"}}'

and:
def config = [lineage:[store:[location:wdir.toString()]]]
Expand Down Expand Up @@ -403,7 +403,7 @@ class LinFileSystemProviderTest extends Specification {
output.resolve('abc').text = 'file1'
output.resolve('.foo').text = 'file2'
wdir.resolve('12345/output').mkdirs()
wdir.resolve('12345/output/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + output.toString() + '"}'
wdir.resolve('12345/output/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + output.toString() + '"}}'
and:
def provider = new LinFileSystemProvider()
def lid1 = provider.getPath(LinPath.asUri('lid://12345/output/abc'))
Expand All @@ -423,7 +423,7 @@ class LinFileSystemProviderTest extends Specification {
def file = data.resolve('abc')
file.text = 'Hello'
wdir.resolve('12345/abc').mkdirs()
wdir.resolve('12345/abc/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path":"' + file.toString() + '"}'
wdir.resolve('12345/abc/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"' + file.toString() + '"}}'
and:
Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ class LinPathTest extends Specification {

wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/path/to/file2.txt').mkdirs()
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + outputFolder.toString() + '"}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + outputFile.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun","spec":{"name":"test"}}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + outputFolder.toString() + '"}}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + outputFile.toString() + '"}}'
def time = OffsetDateTime.now()
def wfResultsMetadata = new LinEncoder().withPrettyPrint(true).encode(new WorkflowOutput(time, "lid://1234", [new Parameter( "Path", "a", "lid://1234/a.txt")]))
wdir.resolve('5678/').mkdirs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class LinEncoderTest extends Specification{
def encoded = encoder.encode(wfResults)
def object = encoder.decode(encoded)
then:
encoded == '{"version":"lineage/v1beta1","kind":"WorkflowOutput","createdAt":null,"workflowRun":"lid://1234","output":null}'
encoded == '{"version":"lineage/v1beta1","kind":"WorkflowOutput","spec":{"createdAt":null,"workflowRun":"lid://1234","output":null}}'
def result = object as WorkflowOutput
result.createdAt == null

Expand Down