-
Notifications
You must be signed in to change notification settings - Fork 675
FOEPD-2109 PipelineRunInfo type to support always-run pipeline stage #6438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThis PR adds a PipelineRunInfo dataclass, adds an always_run field to PipelineStage, replaces pipeline_index with pipeline_run_info in DelegatedOperationDocument, and updates serialization/deserialization to handle the new fields and list-based pipeline input. (49 words) Changes
Sequence DiagramsequenceDiagram
actor Caller
participant DOC as DelegatedOperationDocument
participant P as Pipeline
participant PRI as PipelineRunInfo
Caller->>DOC: from_pymongo(doc)
activate DOC
DOC->>P: Pipeline.from_json(doc.get("pipeline"))
DOC->>PRI: PipelineRunInfo.from_json(doc.get("pipeline_run_info"))
PRI-->>DOC: pipeline_run_info instance
DOC-->>Caller: DelegatedOperationDocument (with pipeline & pipeline_run_info)
deactivate DOC
Caller->>DOC: to_pymongo()
activate DOC
DOC->>P: pipeline.to_json()
DOC->>PRI: pipeline_run_info.to_json() (if present)
DOC-->>Caller: dict including pipeline and pipeline_run_info
deactivate DOC
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧠 Learnings (2)📚 Learning: 2025-01-30T16:40:26.003Z
Applied to files:
📚 Learning: 2025-10-07T01:19:39.063Z
Applied to files:
🧬 Code graph analysis (1)fiftyone/factory/repos/delegated_operation_doc.py (1)
🪛 Pylint (4.0.0)fiftyone/operators/_types/pipeline.py[refactor] 35-35: Too many positional arguments (6/5) (R0917) [refactor] 94-94: Too many positional arguments (6/5) (R0917) 🪛 Ruff (0.14.0)fiftyone/operators/_types/pipeline.py54-54: Avoid specifying long messages outside the exception class (TRY003) 89-89: Unused method argument: (ARG002) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
🔇 Additional comments (13)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
fiftyone/operators/_types/pipeline.py (2)
89-119
: Forward kwargs in Pipeline.stage and silence unused-kwargs lintCurrently, always_run (and future fields) cannot be set via stage(); kwargs are accepted but dropped. Forward them to PipelineStage.
Apply:
def stage( self, operator_uri, name=None, num_distributed_tasks=None, params=None, # kwargs accepted for forward compatibility - **kwargs, # pylint: disable=unused-argument + **kwargs, # noqa: ARG002 # pylint: disable=unused-argument ): @@ - stage = PipelineStage( + stage = PipelineStage( operator_uri=operator_uri, name=name, num_distributed_tasks=num_distributed_tasks, params=params, - ) + **kwargs, + ) self.stages.append(stage) return stageThis preserves forward-compatibility and lets callers do pipeline.stage(..., always_run=True). Based on learnings.
121-141
: Handle None in Pipeline.from_json to match PipelineRunInfo pattern and avoid AttributeErrorThe callsite at
fiftyone/factory/repos/delegated_operation_doc.py:133
passesdoc.get("pipeline")
which can be None when the "pipeline" key is absent.PipelineRunInfo.from_json
already implements this None check (returns None), soPipeline.from_json
should follow the same pattern for consistency.Apply:
@classmethod def from_json(cls, json_dict): """Loads the pipeline from a JSON/python dict. Ex., { "stages": [ {"operator_uri": "@voxel51/test/blah", "name": "my_stage"}, ..., ] } Args: json_dict: a JSON / python dict representation of the pipeline """ + if json_dict is None: + return None if isinstance(json_dict, list): json_dict = {"stages": json_dict} stages = [ PipelineStage(**stage) for stage in json_dict.get("stages") or [] ] return cls(stages=stages)fiftyone/factory/repos/delegated_operation_doc.py (1)
147-147
: Addpipeline_run_info
toignore_keys
to prevent double serialization.The
pipeline_run_info
field is manually serialized at lines 159-160 using.to_json()
, but it's not included inignore_keys
at line 147. This causes thePipelineRunInfo
object to be deep-copied in the dict comprehension (lines 148-152) and then manually serialized again, which is inconsistent with howpipeline
is handled and could lead to serialization issues.Apply this diff to add
pipeline_run_info
to the ignore set:- ignore_keys = {"_doc", "id", "context", "pipeline"} + ignore_keys = {"_doc", "id", "context", "pipeline", "pipeline_run_info"}Based on learnings
Also applies to: 159-160
🧹 Nitpick comments (5)
fiftyone/operators/_types/pipeline.py (3)
34-51
: Silence unused-kwargs and too-many-arguments on PipelineStage.initKeep the forward‑compat behavior but address lints.
Apply:
- def __init__( + def __init__( # pylint: disable=too-many-arguments self, operator_uri: str, always_run: bool = False, name: Optional[str] = None, num_distributed_tasks: Optional[int] = None, params: Optional[Mapping[str, Any]] = None, - **kwargs, # Accepts and ignores unused kwargs + **_, # Accepts and ignores unused kwargs # noqa: ARG002 ):
52-61
: Guard against type errors for num_distributed_tasksCasting was removed; passing a non‑int (e.g., "5") will now raise TypeError at runtime when compared to 1. Add a clear type check.
Apply:
- if ( - self.num_distributed_tasks is not None - and self.num_distributed_tasks < 1 - ): - raise ValueError("num_distributed_tasks must be >= 1") + if self.num_distributed_tasks is not None: + if not isinstance(self.num_distributed_tasks, int): + raise TypeError("num_distributed_tasks must be an int") + if self.num_distributed_tasks < 1: + raise ValueError("num_distributed_tasks must be >= 1")Please confirm no callers supply strings via request params before we enforce this.
16-23
: Update docstring to include always_runDocument the new field to avoid confusion.
Apply:
Args: operator_uri: the URI of the operator to use for the stage name: the name of the stage num_distributed_tasks: the number of distributed tasks to use for the stage, optional params: optional parameters to pass to the operator, overwriting any existing parameters + always_run: if True, this stage runs even when the pipeline is inactive + (e.g., after a failure), enabling cleanup/finalization stagestests/unittests/operators/types_tests.py (1)
46-83
: LGTM; consider adding a stage()-based always_run testSerialization looks correct. Optionally, add a test that sets always_run via Pipeline.stage(..., always_run=True) so the convenience API is covered once kwargs are forwarded.
tests/unittests/factory/delegated_operation_doc_tests.py (1)
65-86
: LGTM; round‑trip coverage for pipeline and run_infoSolid serialization assertions. Optional: import PipelineRunInfo from fiftyone.operators.types for consistency with production imports.
Also applies to: 85-85, 89-89
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
fiftyone/factory/repos/delegated_operation_doc.py
(4 hunks)fiftyone/operators/_types/pipeline.py
(5 hunks)fiftyone/operators/types.py
(1 hunks)tests/unittests/factory/delegated_operation_doc_tests.py
(1 hunks)tests/unittests/operators/delegated_tests.py
(1 hunks)tests/unittests/operators/types_tests.py
(3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-07T01:19:39.063Z
Learnt from: swheaton
PR: voxel51/fiftyone#6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.
Applied to files:
fiftyone/operators/_types/pipeline.py
📚 Learning: 2025-01-30T16:40:26.003Z
Learnt from: swheaton
PR: voxel51/fiftyone#5448
File: fiftyone/factory/repos/delegated_operation_doc.py:0-0
Timestamp: 2025-01-30T16:40:26.003Z
Learning: In FiftyOne's DelegatedOperationDocument class, all instance attributes are automatically serialized through `__dict__` in the `to_pymongo()` method, eliminating the need for explicit field inclusion in serialization.
Applied to files:
fiftyone/factory/repos/delegated_operation_doc.py
🧬 Code graph analysis (5)
tests/unittests/operators/delegated_tests.py (3)
fiftyone/operators/_types/pipeline.py (1)
PipelineStage
(13-68)fiftyone/operators/executor.py (2)
operator_uri
(828-830)num_distributed_tasks
(810-813)fiftyone/factory/repos/delegated_operation_doc.py (1)
num_distributed_tasks
(70-73)
tests/unittests/factory/delegated_operation_doc_tests.py (2)
fiftyone/operators/_types/pipeline.py (5)
PipelineStage
(13-68)PipelineRunInfo
(159-176)to_json
(62-68)to_json
(142-155)to_json
(175-176)fiftyone/factory/repos/delegated_operation_doc.py (4)
num_distributed_tasks
(70-73)to_pymongo
(140-162)DelegatedOperationDocument
(23-162)from_pymongo
(75-138)
fiftyone/factory/repos/delegated_operation_doc.py (1)
fiftyone/operators/_types/pipeline.py (7)
Pipeline
(72-155)PipelineRunInfo
(159-176)from_json
(122-140)from_json
(170-173)to_json
(62-68)to_json
(142-155)to_json
(175-176)
fiftyone/operators/types.py (1)
fiftyone/operators/_types/pipeline.py (3)
Pipeline
(72-155)PipelineRunInfo
(159-176)PipelineStage
(13-68)
tests/unittests/operators/types_tests.py (1)
fiftyone/operators/_types/pipeline.py (6)
PipelineRunInfo
(159-176)to_json
(62-68)to_json
(142-155)to_json
(175-176)from_json
(122-140)from_json
(170-173)
🪛 Pylint (4.0.0)
fiftyone/operators/_types/pipeline.py
[refactor] 35-35: Too many positional arguments (6/5)
(R0917)
🪛 Ruff (0.14.0)
fiftyone/operators/_types/pipeline.py
42-42: Unused method argument: kwargs
(ARG002)
84-84: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
84-84: Unused method argument: kwargs
(ARG002)
96-96: Unused method argument: kwargs
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: test-windows / test-python (windows-latest, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.9)
- GitHub Check: test-windows / test-python (windows-latest, 3.12)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test-windows / test-python (windows-latest, 3.10)
- GitHub Check: test / test-app
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
- GitHub Check: lint / eslint
- GitHub Check: e2e / test-e2e
- GitHub Check: build / build
- GitHub Check: build
🔇 Additional comments (6)
fiftyone/operators/types.py (1)
9-9
: Re-export looks goodExposing PipelineRunInfo alongside Pipeline and PipelineStage is appropriate and aligns dependents.
tests/unittests/operators/types_tests.py (1)
100-114
: LGTM on PipelineRunInfo round‑tripCovers to_json/from_json and equality; nice.
tests/unittests/operators/delegated_tests.py (1)
207-218
: LGTM on updated stage constructionGood coverage for extended fields; pairs well with serialization tests.
fiftyone/factory/repos/delegated_operation_doc.py (3)
18-18
: LGTM! Import is correct.The
PipelineRunInfo
import is properly added alongsidePipeline
to support the new pipeline run state tracking functionality.
67-67
: LGTM! Initialization follows existing patterns.The
pipeline_run_info
attribute is correctly initialized toNone
, consistent with thepipeline
attribute pattern.
133-136
: LGTM! Deserialization logic is correct.The deserialization properly handles both
pipeline
andpipeline_run_info
fields, andPipelineRunInfo.from_json
correctly handlesNone
values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
fiftyone/operators/_types/pipeline.py (1)
84-84
: Fix implicit Optional type on stages parameter (RUF013).Avoid implicit Optional by explicitly annotating the type.
Apply this diff:
- def __init__(self, stages: list[PipelineStage] = None, **kwargs): + def __init__(self, stages: Optional[list[PipelineStage]] = None, **kwargs): # Call the default dataclass initialization for the defined fields self.stages = stages if stages is not None else [] # kwargs are implicitly discarded
🧹 Nitpick comments (1)
fiftyone/operators/_types/pipeline.py (1)
34-50
: **Consider using kwargs unpacking for better maintainability.The custom
__init__
is necessary for forward compatibility, but manually assigning each field is verbose and error-prone. Note also that the comment "Call the default dataclass initialization" is misleading—you're manually assigning fields, not calling the dataclass__init__
.Consider this more maintainable pattern:
- # ADD A CUSTOM __init__ METHOD TO ACCEPT AND DISCARD UNUSED KWARGS def __init__( self, operator_uri: str, always_run: bool = False, name: Optional[str] = None, num_distributed_tasks: Optional[int] = None, params: Optional[Mapping[str, Any]] = None, - **kwargs, # Accepts and ignores unused kwargs + **kwargs, # Accept and filter unused kwargs for forward compatibility ): - # Call the default dataclass initialization for the defined fields - self.operator_uri = operator_uri - self.always_run = always_run - self.name = name - self.num_distributed_tasks = num_distributed_tasks - self.params = params + # Extract known fields, ignore unknown ones + known_params = { + 'operator_uri': operator_uri, + 'always_run': always_run, + 'name': name, + 'num_distributed_tasks': num_distributed_tasks, + 'params': params, + } + for key, value in known_params.items(): + setattr(self, key, value) self.__post_init__()This approach still supports forward compatibility while being more maintainable. Based on learnings.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
fiftyone/operators/_types/pipeline.py
(5 hunks)tests/unittests/operators/types_tests.py
(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-07T01:19:39.063Z
Learnt from: swheaton
PR: voxel51/fiftyone#6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.
Applied to files:
fiftyone/operators/_types/pipeline.py
🧬 Code graph analysis (1)
tests/unittests/operators/types_tests.py (1)
fiftyone/operators/_types/pipeline.py (7)
Pipeline
(72-158)from_json
(122-143)from_json
(173-176)PipelineRunInfo
(162-179)to_json
(62-68)to_json
(145-158)to_json
(178-179)
🪛 Pylint (4.0.0)
fiftyone/operators/_types/pipeline.py
[refactor] 35-35: Too many positional arguments (6/5)
(R0917)
🪛 Ruff (0.14.0)
fiftyone/operators/_types/pipeline.py
42-42: Unused method argument: kwargs
(ARG002)
84-84: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
84-84: Unused method argument: kwargs
(ARG002)
96-96: Unused method argument: kwargs
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: test-windows / test-python (windows-latest, 3.11)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
- GitHub Check: test-windows / test-python (windows-latest, 3.12)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.10)
- GitHub Check: test-windows / test-python (windows-latest, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: lint / eslint
- GitHub Check: e2e / test-e2e
- GitHub Check: build / build
- GitHub Check: build
🔇 Additional comments (7)
tests/unittests/operators/types_tests.py (3)
55-76
: Excellent test coverage for the always_run field.The test properly validates that
always_run
is serialized correctly in both the False (default) and True cases, and that round-trip serialization preserves the field value.
100-102
: Good edge case coverage for None handling.Testing that
from_json(None)
returnsNone
for bothPipeline
andPipelineRunInfo
ensures robustness when deserializing optional fields.
104-118
: Comprehensive test for PipelineRunInfo serialization.The test validates all three fields (
active
,stage_index
,expected_children
) and confirms that round-trip serialization works correctly with non-default values.fiftyone/operators/_types/pipeline.py (4)
9-9
: LGTM!The
List
import is necessary for theexpected_children: Optional[List[int]]
type annotation inPipelineRunInfo
.
29-29
: LGTM!The
always_run
field is properly added with a sensible default value ofFalse
.
135-139
: Excellent enhancements to from_json.The additions improve robustness:
- Explicit
None
handling prevents errors when deserializing optional fields- List input support provides a convenient shorthand for creating pipelines
161-179
: Well-designed PipelineRunInfo implementation.The dataclass cleanly encapsulates pipeline run state with:
- Sensible defaults for all fields
- Consistent
from_json
/to_json
API matching other types- Proper
None
handling infrom_json
What changes are proposed in this pull request?
Model changes to DelegatedOperationDocument to support always-run stages and other future functionality.
It separates the static definition of the pipeline
types.Pipeline
from run state info that is mutable -types.PipelineRunInfo
.This allows pipeline related information to be contained within one sub document instead of spread around - more organized.
The two fields we need added are:
active
, otherwise it is a list matching lengthlen(pipeline.stages)
. This list contains the number of children we should expect to see for each stage. Basically, stages that have been skipped are 0.How is this patch tested? If it is not, please explain why.
Added/edited unit tests
Real testing was with the whole system which will be outlined in the voxel hub PR.
Release Notes
Is this a user-facing change that should be mentioned in the release notes?
notes for FiftyOne users.
Added a new PipelineRunInfo type within DelegatedOperationDocument that keeps track of pipeline run state as it goes. Currently useful for internal purposes only.
What areas of FiftyOne does this PR affect?
fiftyone
Python library changesSummary by CodeRabbit
New Features
Improvements