Skip to content

Conversation

jacobsela
Copy link
Contributor

@jacobsela jacobsela commented Aug 25, 2025

fully offload post processing and writes.

Loading and offloading tensors from GPU is still on main process. Not 100% how (and if we should) get them off. At this point it may be better to just scale horizontally with DOs.

from time import time_ns

import fiftyone as fo
import fiftyone.zoo as foz

dataset = fo.load_dataset("coco-val")
classes = dataset.distinct("ground_truth.detections.label")
model = foz.load_zoo_model("omdet-turbo-swin-tiny-torch", classes=classes)

start = time_ns()
dataset.apply_model(
    model,
    label_field="predictions_ray",
    batch_size=16,
    num_workers=32,
    confidence_thresh=0.1,
    skip_failures=False,
)
end = time_ns()

print(f"Apply Model Time: {(end - start) / 1e9} seconds") # Apply Model Time: 147.059405114 seconds
Screenshot 2025-08-25 at 7 10 53 PM

all remaining time in predict_all that isn't forward pass is moving about tensors from cpu to gpu or back.
Screenshot 2025-08-25 at 7 20 01 PM

I guess if anything this is where multithreading would shine. Not 100% sure how to integrate it here though...

Summary by CodeRabbit

  • New Features
    • Parallelized image model inference and label writing for faster processing on large datasets
    • Automatic initialization of the distributed runtime when using parallel processing
  • Refactor
    • Improved propagation of sample IDs through data pipelines to correctly associate predictions
    • Enhanced handling of model outputs without an output processor, including CPU-safe tensors and image size returns
    • More flexible batching in transformer pipelines, allowing non-tensor fields to pass through without padding

@jacobsela jacobsela requested review from a team as code owners August 25, 2025 23:17
Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

Walkthrough

Adds Ray-based parallel inference and label writing to image-model data loading. Introduces Ray actor utilities and a LabelWriter. Alters prediction paths to emit IDs alongside outputs and adjust no-output-processor returns. Initializes Ray on import. Updates dataset collation to propagate IDs and allow non-tensor batching.

Changes

Cohort / File(s) Summary
Ray integration foundation
fiftyone/core/ray/__init__.py, fiftyone/core/ray/base.py
Initializes Ray at import if not already running. Adds serialization/deserialization of samples, a base actor, and an ActorPoolContext managing shared sample refs and task submission.
Parallel label writing
fiftyone/core/ray/writers.py
Adds LabelWriter Ray actor to select samples by IDs, optionally post-process predictions, add labels, and save via SaveContext.
Model data loader concurrency shift
fiftyone/core/models.py
Replaces per-sample synchronous writes with Ray ActorPoolContext using LabelWriter (16 workers). predict_all now returns (ids, labels_batch); submissions go to the pool. Removes local array export loops and delegates writes to the actor.
Torch utilities: ID plumbing and output shape
fiftyone/utils/torch.py
predict_all returns (ids, predictions) by extracting _id from batches. No-output-processor path returns (output_tensor_cpu, (w,h)). Dataset batching propagates _id per item.
Transformers utilities: output shaping and collation
fiftyone/utils/transformers.py
No-output-processor path returns (output, image_sizes) with tensors detached to CPU. collate_fn passes through non-tensor values as Python lists without padding.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant Models as ImageModel Data Loader
  participant Model as Model.predict_all
  participant Ray as ActorPoolContext
  participant Writer as LabelWriter (Ray Actor)
  participant DS as Samples/SaveContext

  Client->>Models: apply image-model data loader
  Models->>Ray: create ActorPoolContext(LabelWriter, samples, workers=16)
  Models->>Model: predict_all(imgs, samples=sample_batch)
  Model-->>Models: (ids, labels_batch)
  loop per batch
    Models->>Ray: submit(ids, labels_batch)
    Ray->>Writer: run(ids, payloads)
    Writer->>DS: select(ids)
    alt post-processor provided
      Writer->>Writer: payloads = post_processor(..., confidence_thresh)
    end
    loop for each (sample, payload)
      Writer->>DS: sample.add_labels(payload, label_field, confidence_thresh)
      Writer->>DS: save(sample) via SaveContext
    end
  end
  Ray-->>Models: async completions
  Models-->>Client: completed writes
Loading
sequenceDiagram
  autonumber
  participant Torch as Torch/Transformers Utils
  participant Loader as Data Loader
  participant Models as Model.predict_all Consumer

  Loader->>Torch: batch
  Torch-->>Loader: batch with per-item _id
  Loader->>Models: imgs with "_id" field
  Models->>Torch: predict_all(imgs)
  Torch-->>Models: (ids, outputs) or (outputs_cpu, image_sizes)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

In fields of bytes I hop with glee,
Spawning actors, one-two-three! 🐇⚡
IDs in paw, I tag each frame,
Ray beams hum—parallel game.
Labels saved, the carrots count,
Async dreams on every mount.
Thump-thump—merge complete, tantamount.

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ray-postprocessing-and-writes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (9)
fiftyone/utils/transformers.py (1)

684-694: Simplify control flow by removing unnecessary else block

Since the if-block returns, the else is redundant and can be de-indented for better readability.

         if self._output_processor is not None:
             return self._output_processor(
                 output,
                 image_sizes,
                 confidence_thresh=self.config.confidence_thresh,
             )
 
-        else:
-            for k, v in output.items():
-                if isinstance(v, torch.Tensor):
-                    output[k] = v.detach().cpu()
-                elif isinstance(v, (tuple, list)):
-                    output[k] = [
-                        i.detach().cpu()
-                        for i in v
-                        if isinstance(i, torch.Tensor)
-                    ]
-
-            return output, image_sizes
+        for k, v in output.items():
+            if isinstance(v, torch.Tensor):
+                output[k] = v.detach().cpu()
+            elif isinstance(v, (tuple, list)):
+                output[k] = [
+                    i.detach().cpu()
+                    for i in v
+                    if isinstance(i, torch.Tensor)
+                ]
+
+        return output, image_sizes
fiftyone/core/models.py (1)

464-475: Make Ray ActorPool worker count configurable and default to CPU cores

Based on the verification on a system with 8 CPU cores (oversubscribing to 16 workers) and no psutil available for dynamic memory checks, it’s advisable to:

  • Introduce a num_ray_workers parameter (falling back to multiprocessing.cpu_count()) instead of hard-coding 16
  • Allow advanced users to still override via kwargs (or an environment variable) when they know their workload profile
  • Update documentation/examples to reflect the new parameter
--- a/fiftyone/core/models.py
+++ b/fiftyone/core/models.py
@@ -463,7 +463,9 @@ class YourModelClass:
         output_processor = model._output_processor
-        ctx = context.enter_context(
+        import multiprocessing
+        num_ray_workers = kwargs.get("num_ray_workers", multiprocessing.cpu_count())
+        ctx = context.enter_context(
             foray.ActorPoolContext(
                 samples,
                 foray_writers.LabelWriter,
-                num_workers=16,
+                num_workers=num_ray_workers,
                 label_field=label_field,
                 confidence_thresh=confidence_thresh,
                 post_processor=output_processor,
  • Verify downstream callers are updated to pass num_ray_workers if needed
  • (Optional) Add a lightweight memory check or document that memory constraints should be considered when overriding this value
fiftyone/core/ray/writers.py (2)

2-2: Remove unused imports

The torch import is not used in this file.

 import ray
-import torch
 
-import fiftyone.core.ray.base as foray
 import fiftyone.core.collections as foc
 from fiftyone.core.ray.base import FiftyOneActor

23-23: Consider SaveContext lifecycle management

Creating a SaveContext in __init__ and keeping it for the lifetime of the actor could lead to memory issues or stale state if the actor is long-lived.

Consider creating the SaveContext per batch in the run method:

     def __init__(
         self,
         serialized_samples,
         label_field,
         confidence_thresh=None,
         post_processor=None,
         **kwargs
     ):
         super().__init__(serialized_samples, **kwargs)
         self.label_field = label_field
         self.confidence_thresh = confidence_thresh
         self.post_processor = post_processor
-        self.ctx = foc.SaveContext(self.samples)
 
     def run(self, ids, payloads):
         samples_batch = self.samples.select(ids)
 
         if self.post_processor is not None:
             payloads = self.post_processor(
                 *payloads, confidence_thresh=self.confidence_thresh
             )
 
-        with self.ctx:
+        with foc.SaveContext(samples_batch) as ctx:
             for sample, payload in zip(samples_batch, payloads):
                 sample.add_labels(
                     payload,
                     label_field=self.label_field,
                     confidence_thresh=self.confidence_thresh,
                 )
-                self.ctx.save(sample)
+                ctx.save(sample)
fiftyone/core/ray/base.py (5)

6-12: Make serialization resilient to Dataset vs DatasetView and document the contract.

Use the public name on the root dataset via getattr to avoid relying on internals in the Dataset case, and add a short docstring.

Apply this diff:

-def serialize_samples(samples):
-    dataset_name = samples._root_dataset.name
-    stages = (
-        samples._serialize() if isinstance(samples, fov.DatasetView) else None
-    )
-    return dataset_name, stages
+def serialize_samples(samples):
+    """
+    Serializes a SampleCollection (Dataset or DatasetView) into
+    (dataset_name: str, stages: Optional[List[dict]]) for Ray workers.
+    """
+    dataset = getattr(samples, "_root_dataset", samples)
+    dataset_name = dataset.name
+    stages = (
+        samples._serialize() if isinstance(samples, fov.DatasetView) else None
+    )
+    return dataset_name, stages

48-60: Validate and right-size num_workers to the cluster.

Guard against invalid values and avoid oversubscribing by bounding default workers by available CPUs.

Apply this diff:

 def __init__(self, samples, actor_type, *args, num_workers=4, **kwargs):
     super().__init__()
     self.serialized_samples_ref = ray.put(serialize_samples(samples))
-    self.num_workers = num_workers
+    if num_workers is None:
+        available_cpus = int(ray.available_resources().get("CPU", 1))
+        num_workers = max(1, min(4, available_cpus))
+    if num_workers <= 0:
+        raise ValueError(f"num_workers must be >= 1, got {num_workers}")
+    self.num_workers = int(num_workers)
     self.actor_type = actor_type
     self.actors = [
         self.actor_type.remote(
             self.serialized_samples_ref, *args, **kwargs
         )
         for _ in range(self.num_workers)
     ]
     self.pool = ray.util.ActorPool(self.actors)

25-36: Define a base run() to enforce the actor contract.

ActorPoolContext.submit() calls a.run.remote(...). Provide a default abstract method so subclasses get a clear error if they forget to implement it.

Apply this diff:

 class FiftyOneActor:
@@
     def __init__(self, serialized_samples, **kwargs):
         super().__init__()
         self.samples = deserialize_samples(serialized_samples)
 
+    def run(self, *args, **kwargs):  # interface contract
+        raise NotImplementedError("Subclasses must implement run(ids, payloads)")

39-46: Minor: document args/kwargs flow to actors.

Note that any kwargs passed to ActorPoolContext(..., *args, **kwargs) are forwarded to each actor’s constructor. A short doc addition helps future readers.

No code changes required; just consider adding a sentence to the docstring: “Any additional args/kwargs are forwarded to each actor’s init.”


14-23: Harden deserialize_samples with descriptive error on dataset load failure

Verified that DatasetView._build(dataset, stage_dicts) exists in fiftyone/core/view.py:1764. To improve debuggability when fo.load_dataset fails on Ray workers, wrap the call in a try/except and raise a clear RuntimeError.

Files to update:

  • fiftyone/core/ray/base.py (function deserialize_samples)

Proposed diff:

 def deserialize_samples(serialized_samples):
     import fiftyone as fo

     dataset_name, stages = serialized_samples

-    dataset = fo.load_dataset(dataset_name)
+    try:
+        dataset = fo.load_dataset(dataset_name)
+    except Exception as e:
+        raise RuntimeError(
+            f"Failed to load dataset '{dataset_name}' on Ray worker. "
+            "Ensure the dataset exists and workers can access the backend."
+        ) from e

     if stages is not None:
         return fov.DatasetView._build(dataset, stages)
     return dataset
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bb2bbc9 and ff78dbf.

📒 Files selected for processing (6)
  • fiftyone/core/models.py (3 hunks)
  • fiftyone/core/ray/__init__.py (1 hunks)
  • fiftyone/core/ray/base.py (1 hunks)
  • fiftyone/core/ray/writers.py (1 hunks)
  • fiftyone/utils/torch.py (3 hunks)
  • fiftyone/utils/transformers.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
fiftyone/core/ray/base.py (2)
fiftyone/core/view.py (1)
  • _build (1764-1770)
fiftyone/core/ray/writers.py (1)
  • run (25-40)
fiftyone/core/ray/writers.py (3)
fiftyone/core/ray/base.py (1)
  • FiftyOneActor (25-35)
fiftyone/core/collections.py (1)
  • SaveContext (80-234)
fiftyone/core/sample.py (1)
  • add_labels (147-287)
fiftyone/utils/torch.py (2)
fiftyone/utils/transformers.py (3)
  • _predict_all (657-694)
  • _predict_all (812-827)
  • _predict_all (967-973)
fiftyone/utils/ultralytics.py (1)
  • _predict_all (588-621)
fiftyone/core/models.py (3)
fiftyone/core/utils.py (2)
  • lazy_import (732-754)
  • SetAttributes (2718-2751)
fiftyone/core/ray/base.py (2)
  • ActorPoolContext (38-72)
  • submit (71-72)
fiftyone/core/ray/writers.py (1)
  • LabelWriter (10-40)
🪛 Pylint (3.3.7)
fiftyone/core/ray/__init__.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/core/ray/base.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/core/ray/writers.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/utils/transformers.py

[refactor] 676-694: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it

(R1705)

🪛 Ruff (0.12.2)
fiftyone/core/ray/writers.py

2-2: torch imported but unused

Remove unused import: torch

(F401)


4-4: fiftyone.core.ray.base imported but unused

Remove unused import: fiftyone.core.ray.base

(F401)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: build / build
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-app
  • GitHub Check: lint / eslint
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build
🔇 Additional comments (3)
fiftyone/utils/transformers.py (1)

722-725: Potential issue with non-tensor handling in collate_fn

The logic bypasses padding for non-tensor/ndarray items and returns them as a list. This could cause issues downstream if the model expects uniform tensor batches. Ensure this behavior is intentional and properly documented.

Are there specific use cases where non-tensor values (e.g., IDs) need to be passed through the collate function? If so, consider adding a comment explaining this design choice.

fiftyone/utils/torch.py (1)

1917-1930: Good implementation of ID propagation

The implementation correctly:

  • Extracts IDs for each item in the batch
  • Processes items individually
  • Associates the ID with the processed result
  • Handles exceptions appropriately
fiftyone/core/ray/base.py (1)

59-60: I’ve added a quick scan for any pinned Ray dependency across common manifest files so we can confirm which Ray version is in use. Running now.

post_processor=output_processor,
)
)
context.enter_context(fou.SetAttributes(model, _output_processor=None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clearing _output_processor may cause issues

Setting _output_processor=None on the model modifies its internal state, which could cause problems if the model is used elsewhere or if an error occurs before it's restored.

The current approach modifies the model's internal state which could lead to:

  1. Thread safety issues if the model is used concurrently
  2. Incorrect behavior if an exception occurs before restoration
  3. Unexpected side effects for other code using the same model instance

Consider passing the output processor directly to the actor pool without modifying the model.

Comment on lines +486 to +492
ids, labels_batch = model.predict_all(
imgs, samples=sample_batch
)
else:
labels_batch = model.predict_all(imgs)

for sample, labels in zip(sample_batch, labels_batch):
if filename_maker is not None:
_export_arrays(labels, sample.filepath, filename_maker)
ids, labels_batch = model.predict_all(imgs)

sample.add_labels(
labels,
label_field=label_field,
confidence_thresh=confidence_thresh,
)
ctx.save(sample)
ctx.submit(ids, labels_batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Breaking change in predict_all return signature

The change from labels_batch = model.predict_all(...) to ids, labels_batch = model.predict_all(...) is a breaking change that affects the model interface contract.

This needs careful coordination:

  1. All model implementations must be updated to return the tuple format
  2. Consider backward compatibility for existing models
  3. Update documentation and examples
# Consider adding compatibility wrapper:
def _predict_all_compat(model, imgs, samples=None):
    """Wrapper to handle both old and new predict_all signatures."""
    result = model.predict_all(imgs, samples=samples) if samples else model.predict_all(imgs)
    if isinstance(result, tuple) and len(result) == 2:
        return result  # New format: (ids, predictions)
    else:
        # Old format: just predictions, extract IDs from imgs if available
        ids = imgs.get("_id", None)
        return ids, result
🤖 Prompt for AI Agents
In fiftyone/core/models.py around lines 486-492, the code now expects
model.predict_all to return (ids, labels_batch) which is a breaking change for
existing model implementations; add a compatibility wrapper function (e.g.,
_predict_all_compat) that calls model.predict_all with the same args, detects
whether the result is a two-tuple or a single value, and if single value
constructs/derives ids (from provided samples or imgs metadata) and returns
(ids, labels); replace direct calls to model.predict_all at these lines with the
wrapper; update the predict_all callers' docstrings to state the new tuple
return format and run tests to ensure existing models continue to work, updating
any model implementations that must explicitly return the new tuple.

Comment on lines +3 to +4
if not ray.is_initialized():
ray.init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Ray initialization at import time can cause issues

Initializing Ray at module import time is problematic for several reasons:

  1. It prevents users from configuring Ray parameters before initialization
  2. It can interfere with testing and development environments
  3. It forces Ray initialization even when Ray features aren't being used
  4. No error handling means import failures will crash the entire module

Consider lazy initialization or providing an explicit initialization function:

 import ray
 
-if not ray.is_initialized():
-    ray.init()
+def ensure_ray_initialized(**kwargs):
+    """Initialize Ray if not already initialized.
+    
+    Args:
+        **kwargs: Optional Ray initialization parameters
+    """
+    if not ray.is_initialized():
+        ray.init(**kwargs)

Then call this function only when Ray features are actually needed, such as in ActorPoolContext.__init__.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not ray.is_initialized():
ray.init()
import ray
def ensure_ray_initialized(**kwargs):
"""Initialize Ray if not already initialized.
Args:
**kwargs: Optional Ray initialization parameters
"""
if not ray.is_initialized():
ray.init(**kwargs)
🤖 Prompt for AI Agents
In fiftyone/core/ray/__init__.py around lines 3-4, don't call ray.init() during
module import; instead remove the top-level initialization and add a
lazy/explicit initializer function (e.g., ensure_ray_initialized(**opts) or
init_ray(config)) that checks ray.is_initialized(), calls ray.init(...) with
passed parameters, and catches/logs exceptions; update callers (notably
ActorPoolContext.__init__) to call this initializer when Ray features are
actually required, allowing callers to pass configuration and avoiding
import-time side effects.

Comment on lines +33 to +35
def __init__(self, serialized_samples, **kwargs):
super().__init__(**kwargs)
self.samples = deserialize_samples(serialized_samples)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not forward arbitrary kwargs to object.init (will raise TypeError).

super().__init__(**kwargs) on a plain Python class will error because object.__init__ accepts no keyword args. Keep kwargs for subclasses but don't pass them to super().

Apply this diff:

 def __init__(self, serialized_samples, **kwargs):
-        super().__init__(**kwargs)
-        self.samples = deserialize_samples(serialized_samples)
+        # Don't pass kwargs to object.__init__ (TypeError). Store for subclasses.
+        super().__init__()
+        self.samples = deserialize_samples(serialized_samples)
+        self._init_kwargs = kwargs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, serialized_samples, **kwargs):
super().__init__(**kwargs)
self.samples = deserialize_samples(serialized_samples)
def __init__(self, serialized_samples, **kwargs):
# Don't pass kwargs to object.__init__ (TypeError). Store for subclasses.
super().__init__()
self.samples = deserialize_samples(serialized_samples)
self._init_kwargs = kwargs
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 33 to 35, the constructor forwards
arbitrary kwargs to super().__init__(**kwargs) which will call object.__init__
and raise TypeError; change the call to super().__init__() (no kwargs) and, if
subclasses need kwargs, accept and store them or pass them selectively to
subclass super calls rather than to object; ensure
deserialize_samples(serialized_samples) remains assigned to self.samples and do
not forward kwargs to object.__init__.

Comment on lines +64 to +70
def __exit__(self, *args):
# Clean up refs
for actor in self.actors:
del actor

del self.serialized_samples_ref

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure graceful teardown: drain pending results and terminate Ray actors.

Deleting Python references does not stop Ray actors; processes continue until the cluster GC kills them. Drain pending results to avoid backpressure, then explicitly terminate actors.

Apply this diff:

 def __exit__(self, *args):
-        # Clean up refs
-        for actor in self.actors:
-            del actor
-
-        del self.serialized_samples_ref
+        # Drain any pending results so ActorPool marks actors idle
+        try:
+            while hasattr(self.pool, "has_next") and self.pool.has_next():
+                self.pool.get_next_unordered()
+        except Exception:
+            pass
+        # Explicitly terminate remote actors
+        for actor in getattr(self, "actors", []):
+            try:
+                ray.kill(actor)
+            except Exception:
+                pass
+        # Release references
+        try:
+            self.actors.clear()
+        except Exception:
+            pass
+        self.pool = None
+        try:
+            del self.serialized_samples_ref
+        except Exception:
+            pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __exit__(self, *args):
# Clean up refs
for actor in self.actors:
del actor
del self.serialized_samples_ref
def __exit__(self, *args):
# Drain any pending results so ActorPool marks actors idle
try:
while hasattr(self.pool, "has_next") and self.pool.has_next():
self.pool.get_next_unordered()
except Exception:
pass
# Explicitly terminate remote actors
for actor in getattr(self, "actors", []):
try:
ray.kill(actor)
except Exception:
pass
# Release references
try:
self.actors.clear()
except Exception:
pass
self.pool = None
try:
del self.serialized_samples_ref
except Exception:
pass
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 64-70, the __exit__ currently just
deletes Python refs which doesn't stop Ray actors; update it to first drain any
pending results (e.g., call ray.get on self.pending_results or otherwise collect
outstanding ObjectRefs) to avoid backpressure, then explicitly terminate each
actor (use ray.kill(actor) or actor.__ray_terminate__() inside a try/except),
and only after successful termination delete serialized refs and local
references; ensure exceptions from get/kill are caught and logged but do not
prevent cleanup.

Comment on lines +71 to +72
def submit(self, ids, payloads):
self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

ActorPool will stall without consuming results; opportunistically drain to keep progress.

ActorPool.submit() requires periodically calling get_next(_unordered) to mark actors idle. Without consuming results, only the first num_workers tasks will run, and further submissions will queue indefinitely.

Apply this diff to opportunistically release one completed task per submit:

 def submit(self, ids, payloads):
-        self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
+        self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
+        # Opportunistically free one finished task so actors re-enter the idle pool
+        if hasattr(self.pool, "has_next") and self.pool.has_next():
+            self.pool.get_next_unordered()

Additionally, add helper methods so callers can drain explicitly (outside this hunk):

# Add to ActorPoolContext class (e.g., after submit)
def get_next(self):
    return self.pool.get_next_unordered()

def drain(self):
    while self.pool.has_next():
        self.pool.get_next_unordered()
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 71-72, ActorPool.submit currently only
enqueues tasks which will stall once the number of outstanding tasks exceeds
num_workers; modify submit to opportunistically release one completed task by
calling the pool's get_next_unordered when has_next() is true immediately after
submitting so an idle actor is marked free, and add two helper methods on
ActorPoolContext (get_next returning pool.get_next_unordered, and drain looping
while pool.has_next() to call pool.get_next_unordered) so callers can explicitly
drain results when needed.

Comment on lines +25 to +40
def run(self, ids, payloads):
samples_batch = self.samples.select(ids)

if self.post_processor is not None:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)

with self.ctx:
for sample, payload in zip(samples_batch, payloads):
sample.add_labels(
payload,
label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and validation

The run method lacks error handling which could cause silent failures in the Ray actor pool. Additionally, there's no validation that the number of IDs matches the number of payloads.

     def run(self, ids, payloads):
+        if ids is None or not ids:
+            return  # Nothing to process
+            
         samples_batch = self.samples.select(ids)
+        
+        # Validate that we got the expected number of samples
+        if len(samples_batch) != len(ids):
+            raise ValueError(
+                f"Expected {len(ids)} samples but got {len(samples_batch)}"
+            )
 
         if self.post_processor is not None:
-            payloads = self.post_processor(
-                *payloads, confidence_thresh=self.confidence_thresh
-            )
+            try:
+                payloads = self.post_processor(
+                    *payloads, confidence_thresh=self.confidence_thresh
+                )
+            except Exception as e:
+                raise RuntimeError(f"Post-processor failed: {e}") from e
+        
+        # Ensure payloads matches samples count after processing
+        if len(payloads) != len(samples_batch):
+            raise ValueError(
+                f"Payload count {len(payloads)} doesn't match sample count {len(samples_batch)}"
+            )
 
         with self.ctx:
             for sample, payload in zip(samples_batch, payloads):
-                sample.add_labels(
-                    payload,
-                    label_field=self.label_field,
-                    confidence_thresh=self.confidence_thresh,
-                )
-                self.ctx.save(sample)
+                try:
+                    sample.add_labels(
+                        payload,
+                        label_field=self.label_field,
+                        confidence_thresh=self.confidence_thresh,
+                    )
+                    self.ctx.save(sample)
+                except Exception as e:
+                    # Log error but continue processing other samples
+                    print(f"Failed to save labels for sample {sample.id}: {e}")
+                    # Or re-raise if you want to fail the entire batch
+                    # raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run(self, ids, payloads):
samples_batch = self.samples.select(ids)
if self.post_processor is not None:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)
with self.ctx:
for sample, payload in zip(samples_batch, payloads):
sample.add_labels(
payload,
label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
def run(self, ids, payloads):
if ids is None or not ids:
return # Nothing to process
samples_batch = self.samples.select(ids)
# Validate that we got the expected number of samples
if len(samples_batch) != len(ids):
raise ValueError(
f"Expected {len(ids)} samples but got {len(samples_batch)}"
)
if self.post_processor is not None:
try:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)
except Exception as e:
raise RuntimeError(f"Post-processor failed: {e}") from e
# Ensure payloads matches samples count after processing
if len(payloads) != len(samples_batch):
raise ValueError(
f"Payload count {len(payloads)} doesn't match sample count {len(samples_batch)}"
)
with self.ctx:
for sample, payload in zip(samples_batch, payloads):
try:
sample.add_labels(
payload,
label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
except Exception as e:
# Log error but continue processing other samples
print(f"Failed to save labels for sample {sample.id}: {e}")
# Or re-raise if you want to fail the entire batch
# raise

Comment on lines +925 to +927
output = output.detach().cpu()

return output
return output, (width, height)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent return type when no output processor

When there's no output processor, the method returns (output, (width, height)) tuple instead of the expected label format. This creates an inconsistency in return types that could cause issues for callers expecting uniform behavior.

Consider either:

  1. Documenting this behavior clearly in the method docstring
  2. Ensuring consistent return types regardless of output processor presence
  3. Raising an exception if no output processor is configured when one is expected

import fiftyone.core.view as fov


def serialize_samples(samples):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good pattern because you're guaranteeing that the process will require a database connection just to resolve the file path. It would be much better to just resolve the file path directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What file paths are we talking about?

Generally speaking, we want these workers to have a database connection. One of the goals is for them to interact with FO datasets in parallel to the main process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you don't want to give workers database connections. There is zero benefit because the data (media) is not even in the database and cannot be retrieved using the database connection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are writer workers. They must each hold some connection or share access to a pool of connections for us to write multiple things in parallel. Frankly we don't even need a ton of them because most of what they do is sit around waiting for I/O, so a single one that's multithreaded would probably be just fine.

Unrelated grievances with our multi-worker read system can be discussed elsewhere.

Copy link
Contributor

@kaixi-wang kaixi-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds Ray as a dependency and introduces the complexity of relying on another opinionated library but doesn't solve the root of the issue.

with self.ctx:
for sample, payload in zip(samples_batch, payloads):
sample.add_labels(
payload,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what actually needs to be refactored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any optimizations to the base code are welcome. That said, notice that we are offloading multiple things here from the main process, namely:

  1. model output post processing
  2. add_labels
  3. writing to mongo (with ctx.save())

all of these parts have some impact on final apply_model time.

@exupero
Copy link
Contributor

exupero commented Aug 26, 2025

Not being familiar with Ray, I have a couple high-level questions:

  1. Is this using a background thread or process to offload work? (Or does Ray decide?)
  2. If using a background process, does this get around the problem of SampleView and SaveContext not being picklable?

@jacobsela
Copy link
Contributor Author

@exupero
This uses processes backend. The way to get around samples not being serializable by pickle is to manually serialize/deserialize them, see the helper functions in fiftyone.core.ray.base.

@jacobsela
Copy link
Contributor Author

@kaixi-wang

Re:

This adds Ray as a dependency and introduces the complexity of relying on another opinionated library but doesn't solve the root of the issue.

There are two parts here:
(1) the underlying issue
The core problem this PR tries to address is offloading cpu-bound and I/O-bound operations from the main process, which we want to be only gpu-bound. It seems to do so, as indicated by the profiling attached. I am assuming that by underlying issue you mean the inherent performance of things like add_labels. Even if add_labels was optimized to take 0 time, there are other parts of the code that are a GPU utilization bottlenecks. An approach like this allows us to more easily offload whatever we view as the bottleneck from any process. More generally, local optimizations can be really impactful for frequently called, especially slow operations. However, if we are interested in longer term scaling, our main concern should be dividing the code to pieces that we can individually scale according to a workload's needs.

This leads well to the second point, the use of ray (which is built to do exactly this).

(2) adding ray as a dependency
Yes, this is a decision we will have to make. Ray is just a means to an end here. Multiprocessing is a useful tool that can be helpful throughout our codebase. By introducing something like Ray, I hope to cut out a lot of development time that we would otherwise spend on making multiprocessing use in FiftyOne consistent, trusted, clean, and efficient. What are strong arguments for not using ray from your point of view? Let's discuss them and come to a decision.


else:
return output
for k, v in output.items():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants