diff --git a/fiftyone/operators/_types/pipeline.py b/fiftyone/operators/_types/pipeline.py index 97c8a169b2..46d35f27fd 100644 --- a/fiftyone/operators/_types/pipeline.py +++ b/fiftyone/operators/_types/pipeline.py @@ -96,28 +96,35 @@ def stage( return stage @classmethod - def from_json(cls, json_list): - """Loads the pipeline from a list of JSON/python dicts. + def from_json(cls, json_dict): + """Loads the pipeline from a JSON/python dict. - Ex., [ - {"operator_uri": "@voxel51/test/blah", "name": "my_stage", ...}, - ..., - ] + Ex., { + "stages": [ + {"operator_uri": "@voxel51/test/blah", "name": "my_stage"}, + ..., + ] + } Args: - json_list: a list of JSON / python dicts + json_dict: a JSON / python dict representation of the pipeline """ - stages = [PipelineStage(**stage) for stage in json_list] + stages = [ + PipelineStage(**stage) for stage in json_dict.get("stages") or [] + ] return cls(stages=stages) def to_json(self): - """Converts the pipeline to list of JSON/python dicts. + """Converts this pipeline to JSON/python dict representation + + Ex., { + "stages": [ + {"operator_uri": "@voxel51/test/blah", "name": "my_stage"}, + ..., + ] + } - Ex., [ - {"operator_uri": "@voxel51/test/blah", "name": "my_stage", ...}, - ..., - ] Returns: - list of JSON / python dicts + JSON / python dict representation of the pipeline """ - return [stage.to_json() for stage in self.stages] + return dataclasses.asdict(self) diff --git a/tests/unittests/factory/delegated_operation_doc_tests.py b/tests/unittests/factory/delegated_operation_doc_tests.py index 972102a59b..cd347743be 100644 --- a/tests/unittests/factory/delegated_operation_doc_tests.py +++ b/tests/unittests/factory/delegated_operation_doc_tests.py @@ -69,20 +69,7 @@ def test_serialize_pipeline(self): ] ) out = op_doc.to_pymongo() - assert out["pipeline"] == [ - { - "name": "one", - "operator_uri": "@test/op1", - "num_distributed_tasks": None, - "params": None, - }, - { - "name": "two", - "operator_uri": "@test/op2", - "num_distributed_tasks": None, - "params": None, - }, - ] + assert out["pipeline"] == op_doc.pipeline.to_json() op_doc2 = repos.DelegatedOperationDocument() op_doc2.from_pymongo(out) assert op_doc2.pipeline == op_doc.pipeline diff --git a/tests/unittests/operators/types_tests.py b/tests/unittests/operators/types_tests.py new file mode 100644 index 0000000000..305deedc7a --- /dev/null +++ b/tests/unittests/operators/types_tests.py @@ -0,0 +1,95 @@ +""" +FiftyOne operator type tests. + +| Copyright 2017-2025, Voxel51, Inc. +| `voxel51.com `_ +| +""" + +import unittest + +import bson + +import fiftyone as fo +import fiftyone.operators as foo +from fiftyone.operators import types + + +class TestPipelineType(unittest.TestCase): + def test_pipeline_type(self): + pipeline = types.Pipeline() + self.assertListEqual(pipeline.stages, []) + + pipeline = types.Pipeline(stages=[]) + self.assertListEqual(pipeline.stages, []) + + stage1 = types.PipelineStage(operator_uri="my/uri") + stage2 = types.PipelineStage( + operator_uri="my/uri2", + name="stage2", + num_distributed_tasks=5, + params={"foo": "bar"}, + ) + pipeline = types.Pipeline(stages=[stage1, stage2]) + self.assertListEqual(pipeline.stages, [stage1, stage2]) + + pipeline = types.Pipeline() + pipeline.stage(stage1.operator_uri) + pipeline.stage( + stage2.operator_uri, + stage2.name, + stage2.num_distributed_tasks, + stage2.params, + ) + self.assertListEqual(pipeline.stages, [stage1, stage2]) + + def test_serialize(self): + pipeline = types.Pipeline( + stages=[ + types.PipelineStage(operator_uri="my/uri"), + types.PipelineStage( + operator_uri="my/uri2", + name="stage2", + num_distributed_tasks=5, + params={"foo": "bar"}, + ), + ] + ) + dict_rep = pipeline.to_json() + self.assertDictEqual( + dict_rep, + { + "stages": [ + { + "operator_uri": "my/uri", + "name": None, + "num_distributed_tasks": None, + "params": None, + }, + { + "operator_uri": "my/uri2", + "name": "stage2", + "num_distributed_tasks": 5, + "params": {"foo": "bar"}, + }, + ], + }, + ) + new_obj = types.Pipeline.from_json(dict_rep) + self.assertEqual(new_obj, pipeline) + + def test_validation(self): + with self.assertRaises(ValueError): + types.PipelineStage(operator_uri=None) + + with self.assertRaises(ValueError): + types.PipelineStage(operator_uri="my/uri", num_distributed_tasks=0) + + with self.assertRaises(ValueError): + types.PipelineStage( + operator_uri="my/uri", num_distributed_tasks=-5 + ) + + pipe = types.Pipeline() + with self.assertRaises(ValueError): + pipe.stage("my/uri", num_distributed_tasks=-5)