Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions fiftyone/operators/_types/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,35 @@ def stage(
return stage

@classmethod
def from_json(cls, json_list):
"""Loads the pipeline from a list of JSON/python dicts.
def from_json(cls, json_dict):
"""Loads the pipeline from a JSON/python dict.

Ex., [
{"operator_uri": "@voxel51/test/blah", "name": "my_stage", ...},
...,
]
Ex., {
"stages": [
{"operator_uri": "@voxel51/test/blah", "name": "my_stage"},
...,
]
}

Args:
json_list: a list of JSON / python dicts
json_dict: a JSON / python dict representation of the pipeline
"""
stages = [PipelineStage(**stage) for stage in json_list]
stages = [
PipelineStage(**stage) for stage in json_dict.get("stages") or []
]
return cls(stages=stages)

def to_json(self):
"""Converts the pipeline to list of JSON/python dicts.
"""Converts this pipeline to JSON/python dict representation

Ex., {
"stages": [
{"operator_uri": "@voxel51/test/blah", "name": "my_stage"},
...,
]
}

Ex., [
{"operator_uri": "@voxel51/test/blah", "name": "my_stage", ...},
...,
]
Returns:
list of JSON / python dicts
JSON / python dict representation of the pipeline
"""
return [stage.to_json() for stage in self.stages]
return dataclasses.asdict(self)
15 changes: 1 addition & 14 deletions tests/unittests/factory/delegated_operation_doc_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,7 @@ def test_serialize_pipeline(self):
]
)
out = op_doc.to_pymongo()
assert out["pipeline"] == [
{
"name": "one",
"operator_uri": "@test/op1",
"num_distributed_tasks": None,
"params": None,
},
{
"name": "two",
"operator_uri": "@test/op2",
"num_distributed_tasks": None,
"params": None,
},
]
assert out["pipeline"] == op_doc.pipeline.to_json()
op_doc2 = repos.DelegatedOperationDocument()
op_doc2.from_pymongo(out)
assert op_doc2.pipeline == op_doc.pipeline
95 changes: 95 additions & 0 deletions tests/unittests/operators/types_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
FiftyOne operator type tests.

| Copyright 2017-2025, Voxel51, Inc.
| `voxel51.com <https://voxel51.com/>`_
|
"""

import unittest

import bson

import fiftyone as fo
import fiftyone.operators as foo
from fiftyone.operators import types


class TestPipelineType(unittest.TestCase):
def test_pipeline_type(self):
pipeline = types.Pipeline()
self.assertListEqual(pipeline.stages, [])

pipeline = types.Pipeline(stages=[])
self.assertListEqual(pipeline.stages, [])

stage1 = types.PipelineStage(operator_uri="my/uri")
stage2 = types.PipelineStage(
operator_uri="my/uri2",
name="stage2",
num_distributed_tasks=5,
params={"foo": "bar"},
)
pipeline = types.Pipeline(stages=[stage1, stage2])
self.assertListEqual(pipeline.stages, [stage1, stage2])

pipeline = types.Pipeline()
pipeline.stage(stage1.operator_uri)
pipeline.stage(
stage2.operator_uri,
stage2.name,
stage2.num_distributed_tasks,
stage2.params,
)
self.assertListEqual(pipeline.stages, [stage1, stage2])

def test_serialize(self):
pipeline = types.Pipeline(
stages=[
types.PipelineStage(operator_uri="my/uri"),
types.PipelineStage(
operator_uri="my/uri2",
name="stage2",
num_distributed_tasks=5,
params={"foo": "bar"},
),
]
)
dict_rep = pipeline.to_json()
self.assertDictEqual(
dict_rep,
{
"stages": [
{
"operator_uri": "my/uri",
"name": None,
"num_distributed_tasks": None,
"params": None,
},
{
"operator_uri": "my/uri2",
"name": "stage2",
"num_distributed_tasks": 5,
"params": {"foo": "bar"},
},
],
},
)
new_obj = types.Pipeline.from_json(dict_rep)
self.assertEqual(new_obj, pipeline)

def test_validation(self):
with self.assertRaises(ValueError):
types.PipelineStage(operator_uri=None)

with self.assertRaises(ValueError):
types.PipelineStage(operator_uri="my/uri", num_distributed_tasks=0)

with self.assertRaises(ValueError):
types.PipelineStage(
operator_uri="my/uri", num_distributed_tasks=-5
)

pipe = types.Pipeline()
with self.assertRaises(ValueError):
pipe.stage("my/uri", num_distributed_tasks=-5)
Loading