Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions fiftyone/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
foutr = fou.lazy_import("fiftyone.utils.transformers")
fouu = fou.lazy_import("fiftyone.utils.ultralytics")

foray = fou.lazy_import("fiftyone.core.ray.base")
foray_writers = fou.lazy_import("fiftyone.core.ray.writers")

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -459,7 +461,18 @@ def _apply_image_model_data_loader(

with contextlib.ExitStack() as context:
pb = context.enter_context(fou.ProgressBar(samples, progress=progress))
ctx = context.enter_context(foc.SaveContext(samples))
output_processor = model._output_processor
ctx = context.enter_context(
foray.ActorPoolContext(
samples,
foray_writers.LabelWriter,
num_workers=16,
label_field=label_field,
confidence_thresh=confidence_thresh,
post_processor=output_processor,
)
)
context.enter_context(fou.SetAttributes(model, _output_processor=None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clearing _output_processor may cause issues

Setting _output_processor=None on the model modifies its internal state, which could cause problems if the model is used elsewhere or if an error occurs before it's restored.

The current approach modifies the model's internal state which could lead to:

  1. Thread safety issues if the model is used concurrently
  2. Incorrect behavior if an exception occurs before restoration
  3. Unexpected side effects for other code using the same model instance

Consider passing the output processor directly to the actor pool without modifying the model.


for sample_batch, imgs in zip(
fou.iter_batches(samples, batch_size),
Expand All @@ -470,22 +483,13 @@ def _apply_image_model_data_loader(
raise imgs

if needs_samples:
labels_batch = model.predict_all(
ids, labels_batch = model.predict_all(
imgs, samples=sample_batch
)
else:
labels_batch = model.predict_all(imgs)

for sample, labels in zip(sample_batch, labels_batch):
if filename_maker is not None:
_export_arrays(labels, sample.filepath, filename_maker)
ids, labels_batch = model.predict_all(imgs)

sample.add_labels(
labels,
label_field=label_field,
confidence_thresh=confidence_thresh,
)
ctx.save(sample)
ctx.submit(ids, labels_batch)
Comment on lines +486 to +492
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Breaking change in predict_all return signature

The change from labels_batch = model.predict_all(...) to ids, labels_batch = model.predict_all(...) is a breaking change that affects the model interface contract.

This needs careful coordination:

  1. All model implementations must be updated to return the tuple format
  2. Consider backward compatibility for existing models
  3. Update documentation and examples
# Consider adding compatibility wrapper:
def _predict_all_compat(model, imgs, samples=None):
    """Wrapper to handle both old and new predict_all signatures."""
    result = model.predict_all(imgs, samples=samples) if samples else model.predict_all(imgs)
    if isinstance(result, tuple) and len(result) == 2:
        return result  # New format: (ids, predictions)
    else:
        # Old format: just predictions, extract IDs from imgs if available
        ids = imgs.get("_id", None)
        return ids, result
🤖 Prompt for AI Agents
In fiftyone/core/models.py around lines 486-492, the code now expects
model.predict_all to return (ids, labels_batch) which is a breaking change for
existing model implementations; add a compatibility wrapper function (e.g.,
_predict_all_compat) that calls model.predict_all with the same args, detects
whether the result is a two-tuple or a single value, and if single value
constructs/derives ids (from provided samples or imgs metadata) and returns
(ids, labels); replace direct calls to model.predict_all at these lines with the
wrapper; update the predict_all callers' docstrings to state the new tuple
return format and run tests to ensure existing models continue to work, updating
any model implementations that must explicitly return the new tuple.


except Exception as e:
if not skip_failures:
Expand Down
4 changes: 4 additions & 0 deletions fiftyone/core/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import ray

if not ray.is_initialized():
ray.init()
Comment on lines +3 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Ray initialization at import time can cause issues

Initializing Ray at module import time is problematic for several reasons:

  1. It prevents users from configuring Ray parameters before initialization
  2. It can interfere with testing and development environments
  3. It forces Ray initialization even when Ray features aren't being used
  4. No error handling means import failures will crash the entire module

Consider lazy initialization or providing an explicit initialization function:

 import ray
 
-if not ray.is_initialized():
-    ray.init()
+def ensure_ray_initialized(**kwargs):
+    """Initialize Ray if not already initialized.
+    
+    Args:
+        **kwargs: Optional Ray initialization parameters
+    """
+    if not ray.is_initialized():
+        ray.init(**kwargs)

Then call this function only when Ray features are actually needed, such as in ActorPoolContext.__init__.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not ray.is_initialized():
ray.init()
import ray
def ensure_ray_initialized(**kwargs):
"""Initialize Ray if not already initialized.
Args:
**kwargs: Optional Ray initialization parameters
"""
if not ray.is_initialized():
ray.init(**kwargs)
🤖 Prompt for AI Agents
In fiftyone/core/ray/__init__.py around lines 3-4, don't call ray.init() during
module import; instead remove the top-level initialization and add a
lazy/explicit initializer function (e.g., ensure_ray_initialized(**opts) or
init_ray(config)) that checks ray.is_initialized(), calls ray.init(...) with
passed parameters, and catches/logs exceptions; update callers (notably
ActorPoolContext.__init__) to call this initializer when Ray features are
actually required, allowing callers to pass configuration and avoiding
import-time side effects.

72 changes: 72 additions & 0 deletions fiftyone/core/ray/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import ray

import fiftyone.core.view as fov


def serialize_samples(samples):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good pattern because you're guaranteeing that the process will require a database connection just to resolve the file path. It would be much better to just resolve the file path directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What file paths are we talking about?

Generally speaking, we want these workers to have a database connection. One of the goals is for them to interact with FO datasets in parallel to the main process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you don't want to give workers database connections. There is zero benefit because the data (media) is not even in the database and cannot be retrieved using the database connection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are writer workers. They must each hold some connection or share access to a pool of connections for us to write multiple things in parallel. Frankly we don't even need a ton of them because most of what they do is sit around waiting for I/O, so a single one that's multithreaded would probably be just fine.

Unrelated grievances with our multi-worker read system can be discussed elsewhere.

dataset_name = samples._root_dataset.name
stages = (
samples._serialize() if isinstance(samples, fov.DatasetView) else None
)
return dataset_name, stages


def deserialize_samples(serialized_samples):
import fiftyone as fo

dataset_name, stages = serialized_samples

dataset = fo.load_dataset(dataset_name)
if stages is not None:
return fov.DatasetView._build(dataset, stages)
return dataset


class FiftyOneActor:
"""Class for FiftyOne Ray actors.
Args:
serialized_samples: a serialized representation of a
:class:`fiftyone.core.collections.SampleCollection`
"""

def __init__(self, serialized_samples, **kwargs):
super().__init__(**kwargs)
self.samples = deserialize_samples(serialized_samples)
Comment on lines +33 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not forward arbitrary kwargs to object.init (will raise TypeError).

super().__init__(**kwargs) on a plain Python class will error because object.__init__ accepts no keyword args. Keep kwargs for subclasses but don't pass them to super().

Apply this diff:

 def __init__(self, serialized_samples, **kwargs):
-        super().__init__(**kwargs)
-        self.samples = deserialize_samples(serialized_samples)
+        # Don't pass kwargs to object.__init__ (TypeError). Store for subclasses.
+        super().__init__()
+        self.samples = deserialize_samples(serialized_samples)
+        self._init_kwargs = kwargs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, serialized_samples, **kwargs):
super().__init__(**kwargs)
self.samples = deserialize_samples(serialized_samples)
def __init__(self, serialized_samples, **kwargs):
# Don't pass kwargs to object.__init__ (TypeError). Store for subclasses.
super().__init__()
self.samples = deserialize_samples(serialized_samples)
self._init_kwargs = kwargs
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 33 to 35, the constructor forwards
arbitrary kwargs to super().__init__(**kwargs) which will call object.__init__
and raise TypeError; change the call to super().__init__() (no kwargs) and, if
subclasses need kwargs, accept and store them or pass them selectively to
subclass super calls rather than to object; ensure
deserialize_samples(serialized_samples) remains assigned to self.samples and do
not forward kwargs to object.__init__.



class ActorPoolContext:
"""Context manager for a pool of Ray actors.
Args:
samples: a :class:`fiftyone.core.collections.SampleCollection`
actor_type: the :class:`FiftyOneActor` subclass to instantiate
for each worker
num_workers (int): the number of workers in the pool
"""

def __init__(self, samples, actor_type, *args, num_workers=4, **kwargs):
super().__init__()
self.serialized_samples_ref = ray.put(serialize_samples(samples))
self.num_workers = num_workers
self.actor_type = actor_type
self.actors = [
self.actor_type.remote(
self.serialized_samples_ref, *args, **kwargs
)
for _ in range(self.num_workers)
]
self.pool = ray.util.ActorPool(self.actors)

def __enter__(self):
return self

def __exit__(self, *args):
# Clean up refs
for actor in self.actors:
del actor

del self.serialized_samples_ref

Comment on lines +64 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure graceful teardown: drain pending results and terminate Ray actors.

Deleting Python references does not stop Ray actors; processes continue until the cluster GC kills them. Drain pending results to avoid backpressure, then explicitly terminate actors.

Apply this diff:

 def __exit__(self, *args):
-        # Clean up refs
-        for actor in self.actors:
-            del actor
-
-        del self.serialized_samples_ref
+        # Drain any pending results so ActorPool marks actors idle
+        try:
+            while hasattr(self.pool, "has_next") and self.pool.has_next():
+                self.pool.get_next_unordered()
+        except Exception:
+            pass
+        # Explicitly terminate remote actors
+        for actor in getattr(self, "actors", []):
+            try:
+                ray.kill(actor)
+            except Exception:
+                pass
+        # Release references
+        try:
+            self.actors.clear()
+        except Exception:
+            pass
+        self.pool = None
+        try:
+            del self.serialized_samples_ref
+        except Exception:
+            pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __exit__(self, *args):
# Clean up refs
for actor in self.actors:
del actor
del self.serialized_samples_ref
def __exit__(self, *args):
# Drain any pending results so ActorPool marks actors idle
try:
while hasattr(self.pool, "has_next") and self.pool.has_next():
self.pool.get_next_unordered()
except Exception:
pass
# Explicitly terminate remote actors
for actor in getattr(self, "actors", []):
try:
ray.kill(actor)
except Exception:
pass
# Release references
try:
self.actors.clear()
except Exception:
pass
self.pool = None
try:
del self.serialized_samples_ref
except Exception:
pass
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 64-70, the __exit__ currently just
deletes Python refs which doesn't stop Ray actors; update it to first drain any
pending results (e.g., call ray.get on self.pending_results or otherwise collect
outstanding ObjectRefs) to avoid backpressure, then explicitly terminate each
actor (use ray.kill(actor) or actor.__ray_terminate__() inside a try/except),
and only after successful termination delete serialized refs and local
references; ensure exceptions from get/kill are caught and logged but do not
prevent cleanup.

def submit(self, ids, payloads):
self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
Comment on lines +71 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

ActorPool will stall without consuming results; opportunistically drain to keep progress.

ActorPool.submit() requires periodically calling get_next(_unordered) to mark actors idle. Without consuming results, only the first num_workers tasks will run, and further submissions will queue indefinitely.

Apply this diff to opportunistically release one completed task per submit:

 def submit(self, ids, payloads):
-        self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
+        self.pool.submit(lambda a, v: a.run.remote(*v), (ids, payloads))
+        # Opportunistically free one finished task so actors re-enter the idle pool
+        if hasattr(self.pool, "has_next") and self.pool.has_next():
+            self.pool.get_next_unordered()

Additionally, add helper methods so callers can drain explicitly (outside this hunk):

# Add to ActorPoolContext class (e.g., after submit)
def get_next(self):
    return self.pool.get_next_unordered()

def drain(self):
    while self.pool.has_next():
        self.pool.get_next_unordered()
🤖 Prompt for AI Agents
In fiftyone/core/ray/base.py around lines 71-72, ActorPool.submit currently only
enqueues tasks which will stall once the number of outstanding tasks exceeds
num_workers; modify submit to opportunistically release one completed task by
calling the pool's get_next_unordered when has_next() is true immediately after
submitting so an idle actor is marked free, and add two helper methods on
ActorPoolContext (get_next returning pool.get_next_unordered, and drain looping
while pool.has_next() to call pool.get_next_unordered) so callers can explicitly
drain results when needed.

40 changes: 40 additions & 0 deletions fiftyone/core/ray/writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import ray
import torch

import fiftyone.core.ray.base as foray
import fiftyone.core.collections as foc
from fiftyone.core.ray.base import FiftyOneActor


@ray.remote
class LabelWriter(FiftyOneActor):
def __init__(
self,
serialized_samples,
label_field,
confidence_thresh=None,
post_processor=None,
**kwargs
):
super().__init__(serialized_samples, **kwargs)
self.label_field = label_field
self.confidence_thresh = confidence_thresh
self.post_processor = post_processor
self.ctx = foc.SaveContext(self.samples)

def run(self, ids, payloads):
samples_batch = self.samples.select(ids)

if self.post_processor is not None:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)

with self.ctx:
for sample, payload in zip(samples_batch, payloads):
sample.add_labels(
payload,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what actually needs to be refactored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any optimizations to the base code are welcome. That said, notice that we are offloading multiple things here from the main process, namely:

  1. model output post processing
  2. add_labels
  3. writing to mongo (with ctx.save())

all of these parts have some impact on final apply_model time.

label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
Comment on lines +25 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and validation

The run method lacks error handling which could cause silent failures in the Ray actor pool. Additionally, there's no validation that the number of IDs matches the number of payloads.

     def run(self, ids, payloads):
+        if ids is None or not ids:
+            return  # Nothing to process
+            
         samples_batch = self.samples.select(ids)
+        
+        # Validate that we got the expected number of samples
+        if len(samples_batch) != len(ids):
+            raise ValueError(
+                f"Expected {len(ids)} samples but got {len(samples_batch)}"
+            )
 
         if self.post_processor is not None:
-            payloads = self.post_processor(
-                *payloads, confidence_thresh=self.confidence_thresh
-            )
+            try:
+                payloads = self.post_processor(
+                    *payloads, confidence_thresh=self.confidence_thresh
+                )
+            except Exception as e:
+                raise RuntimeError(f"Post-processor failed: {e}") from e
+        
+        # Ensure payloads matches samples count after processing
+        if len(payloads) != len(samples_batch):
+            raise ValueError(
+                f"Payload count {len(payloads)} doesn't match sample count {len(samples_batch)}"
+            )
 
         with self.ctx:
             for sample, payload in zip(samples_batch, payloads):
-                sample.add_labels(
-                    payload,
-                    label_field=self.label_field,
-                    confidence_thresh=self.confidence_thresh,
-                )
-                self.ctx.save(sample)
+                try:
+                    sample.add_labels(
+                        payload,
+                        label_field=self.label_field,
+                        confidence_thresh=self.confidence_thresh,
+                    )
+                    self.ctx.save(sample)
+                except Exception as e:
+                    # Log error but continue processing other samples
+                    print(f"Failed to save labels for sample {sample.id}: {e}")
+                    # Or re-raise if you want to fail the entire batch
+                    # raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run(self, ids, payloads):
samples_batch = self.samples.select(ids)
if self.post_processor is not None:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)
with self.ctx:
for sample, payload in zip(samples_batch, payloads):
sample.add_labels(
payload,
label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
def run(self, ids, payloads):
if ids is None or not ids:
return # Nothing to process
samples_batch = self.samples.select(ids)
# Validate that we got the expected number of samples
if len(samples_batch) != len(ids):
raise ValueError(
f"Expected {len(ids)} samples but got {len(samples_batch)}"
)
if self.post_processor is not None:
try:
payloads = self.post_processor(
*payloads, confidence_thresh=self.confidence_thresh
)
except Exception as e:
raise RuntimeError(f"Post-processor failed: {e}") from e
# Ensure payloads matches samples count after processing
if len(payloads) != len(samples_batch):
raise ValueError(
f"Payload count {len(payloads)} doesn't match sample count {len(samples_batch)}"
)
with self.ctx:
for sample, payload in zip(samples_batch, payloads):
try:
sample.add_labels(
payload,
label_field=self.label_field,
confidence_thresh=self.confidence_thresh,
)
self.ctx.save(sample)
except Exception as e:
# Log error but continue processing other samples
print(f"Failed to save labels for sample {sample.id}: {e}")
# Or re-raise if you want to fail the entire batch
# raise

14 changes: 9 additions & 5 deletions fiftyone/utils/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,8 @@ def predict_all(self, imgs):
of dicts of :class:`fiftyone.core.labels.Label` instances
containing the predictions
"""
return self._predict_all(imgs)
ids = imgs.pop("_id", None)
return ids, self._predict_all(imgs)

def _predict_all(self, imgs):
if self._preprocess and self._transforms is not None:
Expand Down Expand Up @@ -921,9 +922,9 @@ def _predict_all(self, imgs):

if self._output_processor is None:
if isinstance(output, torch.Tensor):
output = output.detach().cpu().numpy()
output = output.detach().cpu()

return output
return output, (width, height)
Comment on lines +925 to +927
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent return type when no output processor

When there's no output processor, the method returns (output, (width, height)) tuple instead of the expected label format. This creates an inconsistency in return types that could cause issues for callers expecting uniform behavior.

Consider either:

  1. Documenting this behavior clearly in the method docstring
  2. Ensuring consistent return types regardless of output processor presence
  3. Raising an exception if no output processor is configured when one is expected


if self.has_logits:
self._output_processor.store_logits = self.store_logits
Expand Down Expand Up @@ -1913,17 +1914,20 @@ def __getitem__(self, idx):
return self.__getitems__([idx])[0]

def __getitems__(self, indices):
_ids = [self.ids[idx] for idx in indices]
if self.vectorize:
batch = self._prepare_batch_vectorized(indices)
else:
batch = self._prepare_batch_db(indices)

res = []
for d in batch:
for i, d in enumerate(batch):
if isinstance(d, Exception):
res.append(d)
else:
res.append(self._get_item(d))
_processed = self._get_item(d)
_processed.update({"_id": _ids[i]})
res.append(_processed)

return res

Expand Down
16 changes: 15 additions & 1 deletion fiftyone/utils/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,17 @@ def _predict_all(self, args):
)

else:
return output
for k, v in output.items():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this

if isinstance(v, torch.Tensor):
output[k] = v.detach().cpu()
elif isinstance(v, (tuple, list)):
output[k] = [
i.detach().cpu()
for i in v
if isinstance(i, torch.Tensor)
]

return output, image_sizes

def _forward_pass(self, args):
return self._model(
Expand Down Expand Up @@ -709,6 +719,10 @@ def collate_fn(batch):
keys = batch[0].keys()
res = {}
for k in keys:
if not isinstance(batch[0][k], (torch.Tensor, np.ndarray)):
# not a tensor, just return the list
res[k] = [b[k] for b in batch]
continue
# Gather shapes for dimension analysis
shapes = [b[k].shape for b in batch]
# Find the max size in each dimension
Expand Down
Loading