Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions fiftyone/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def apply_model(
store_logits=False,
batch_size=None,
num_workers=None,
num_writer_workers=None,
skip_failures=True,
output_dir=None,
rel_dir=None,
Expand Down Expand Up @@ -105,6 +106,8 @@ def apply_model(
batching
num_workers (None): the number of workers to use when loading images.
Only applicable for Torch-based models
num_writer_workers (None): the number of thread workers to use when
writing predictions. Only applicable for Torch-based models.
skip_failures (True): whether to gracefully continue without raising an
error if predictions cannot be generated for a sample. Only
applicable to :class:`Model` instances
Expand Down Expand Up @@ -284,6 +287,7 @@ def apply_model(
filename_maker,
progress,
field_mapping,
num_writer_workers,
)

if batch_size is not None:
Expand Down Expand Up @@ -457,6 +461,7 @@ def _apply_image_model_data_loader(
filename_maker,
progress,
field_mapping,
num_writer_workers,
):
needs_samples = isinstance(model, SamplesMixin)

Expand All @@ -480,7 +485,7 @@ def _apply_image_model_data_loader(
ctx = context.enter_context(foc.SaveContext(samples))
submit = context.enter_context(
fou.async_executor(
max_workers=1,
max_workers=num_writer_workers,
skip_failures=skip_failures,
warning="Async failure labeling batches",
)
Expand Down Expand Up @@ -904,6 +909,7 @@ def compute_embeddings(
embeddings_field=None,
batch_size=None,
num_workers=None,
num_writer_workers=None,
skip_failures=True,
progress=None,
**kwargs,
Expand Down Expand Up @@ -934,6 +940,8 @@ def compute_embeddings(
batching
num_workers (None): the number of workers to use when loading images.
Only applicable for Torch-based models
num_writer_workers (None): the number of thread workers to use when
writing embeddings. Only applicable for Torch-based models.
skip_failures (True): whether to gracefully continue without raising an
error if embeddings cannot be generated for a sample. Only
applicable to :class:`Model` instances
Expand Down Expand Up @@ -1078,6 +1086,7 @@ def compute_embeddings(
skip_failures,
progress,
field_mapping,
num_writer_workers,
)

if batch_size is not None:
Expand Down Expand Up @@ -1200,6 +1209,7 @@ def _compute_image_embeddings_data_loader(
skip_failures,
progress,
field_mapping,
num_writer_workers,
):
data_loader = _make_data_loader(
samples,
Expand All @@ -1224,7 +1234,7 @@ def _compute_image_embeddings_data_loader(

submit = context.enter_context(
fou.async_executor(
max_workers=1,
max_workers=num_writer_workers,
skip_failures=skip_failures,
warning="Async failure saving embeddings",
)
Expand Down
18 changes: 15 additions & 3 deletions fiftyone/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2859,14 +2859,19 @@ def recommend_thread_pool_workers(num_workers=None):
If a ``fo.config.max_thread_pool_workers`` is set, this limit is applied.

Args:
num_workers (None): a suggested number of workers
num_workers (None): a suggested number of workers. If ``num_workers <= 0``, this
function returns 1.

Returns:
a number of workers
"""

if num_workers is None:
num_workers = multiprocessing.cpu_count()

if num_workers <= 0:
num_workers = 1

if fo.config.max_thread_pool_workers is not None:
num_workers = min(num_workers, fo.config.max_thread_pool_workers)

Expand Down Expand Up @@ -3147,7 +3152,7 @@ def validate_hex_color(value):

@contextmanager
def async_executor(
*, max_workers, skip_failures=False, warning="Async failure"
*, max_workers=None, skip_failures=False, warning="Async failure"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

❓ Verification inconclusive

Keyword‑only args may break existing positional calls; keep first arg positional or verify all call sites

Switching to keyword‑only can break user code calling with async_executor(4). Either keep the first parameter positional, or confirm all call sites are keyworded.

Option A (backward compatible):

-@contextmanager
-def async_executor(
-    *, max_workers=None, skip_failures=False, warning="Async failure"
-):
+@contextmanager
+def async_executor(
+    max_workers=None, *, skip_failures=False, warning="Async failure"
+):

Option B (keep as-is): verify no positional usages in the repo:


🏁 Script executed:

#!/bin/bash
# Find async_executor calls that don't use keywords for the first arg
rg -nP -C1 '\basync_executor\s*\(\s*(?!\)|max_workers=|skip_failures=|warning=)\S' || true
# Common context manager form
rg -nP -C1 '\bwith\s+async_executor\s*\(\s*(?!\)|max_workers=|skip_failures=|warning=)\S' || true

Length of output: 167


async_executor signature change is a breaking change
No internal calls rely on a positional first argument, but as a public API making max_workers keyword-only will break external callers. Either revert to a backward-compatible signature:

@contextmanager
-def async_executor(
-    *, max_workers=None, skip_failures=False, warning="Async failure"
-):
+def async_executor(
+    max_workers=None, *, skip_failures=False, warning="Async failure"
+):

or keep the keyword-only signature but document the breaking change and bump the major version.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
*, max_workers=None, skip_failures=False, warning="Async failure"
@contextmanager
def async_executor(
max_workers=None, *, skip_failures=False, warning="Async failure"
):
🤖 Prompt for AI Agents
In fiftyone/core/utils.py around line 3155, the change making max_workers a
keyword-only parameter is a breaking public-API change; restore backward
compatibility by accepting max_workers as a positional parameter (move
max_workers before the * or remove the keyword-only marker) so existing callers
continue to work, or if you intend to keep it keyword-only then update the
public docs/changelog to note the breaking change and bump the package major
version accordingly.

):
"""
Context manager that provides a function for submitting tasks to a thread
Expand All @@ -3160,14 +3165,21 @@ def async_executor(
submit(process_item, item)

Args:
max_workers: the maximum number of workers to use
max_workers (None): the maximum number of workers to use. By default,
this is determined by :func:`fiftyone.core.utils.recommend_thread_pool_workers`.
skip_failures (False): whether to skip exceptions raised by tasks
warning ("Async failure"): the warning message to log if a task
raises an exception and ``skip_failures == True``

Raises:
Exception: if a task raises an exception and ``skip_failures == False``
"""
if max_workers is None:
max_workers = (
fo.config.default_thread_pool_workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the data, it looks like there is no benefit after 8 (most the benefits are seen with 4) so it seems like we should cap it rather than use the default (cpu_count()). For high cpu machines, this could have deleterious affects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data attached is for computing dense embeddings with a (relatively) large batch size. It's of interest because we will use this workload in det RERs. It's also likely reflective of other very heavy mongo I/O workloads. Conversely, something like a regular apply_model on detections with a low batch size will have many small payloads and may benefit from more threads.

Given that the user can already configure the number of threads via any of:

  • the argument num_writer_workers
  • fo.config.default_thread_pool_workers
  • fo.config.max_thread_pool_workers

I would rather not put a hard cap here.

What concerns do you have for very many threads being used? Maybe they can be separately adressed.

or recommend_thread_pool_workers(max_workers)
)
Comment on lines +3177 to +3181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clamp defaults to config max and guard against zero workers

When fo.config.default_thread_pool_workers is set, the current logic bypasses recommend_thread_pool_workers(...), so it may exceed fo.config.max_thread_pool_workers. It also allows 0, which will crash ThreadPoolExecutor. Clamp via recommend_thread_pool_workers and enforce a minimum of 1.

-    if max_workers is None:
-        max_workers = (
-            fo.config.default_thread_pool_workers
-            or recommend_thread_pool_workers(max_workers)
-        )
+    if max_workers is None:
+        cfg_default = getattr(fo.config, "default_thread_pool_workers", None)
+        # Clamp to fo.config.max_thread_pool_workers inside helper
+        max_workers = recommend_thread_pool_workers(cfg_default)
+    if max_workers is None or max_workers < 1:
+        # ThreadPoolExecutor rejects 0; ensure a sensible minimum
+        max_workers = 1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if max_workers is None:
max_workers = (
fo.config.default_thread_pool_workers
or recommend_thread_pool_workers(max_workers)
)
if max_workers is None:
cfg_default = getattr(fo.config, "default_thread_pool_workers", None)
# Clamp to fo.config.max_thread_pool_workers inside helper
max_workers = recommend_thread_pool_workers(cfg_default)
if max_workers is None or max_workers < 1:
# ThreadPoolExecutor rejects 0; ensure a sensible minimum
max_workers = 1
🤖 Prompt for AI Agents
In fiftyone/core/utils.py around lines 3171-3175, the current default-selection
bypasses recommend_thread_pool_workers and can exceed
fo.config.max_thread_pool_workers or be zero; change the logic to compute a
tentative default (use fo.config.default_thread_pool_workers if set else
recommend_thread_pool_workers(max_workers)), then pass that tentative value
through recommend_thread_pool_workers to clamp it within configured bounds, and
finally enforce a minimum of 1 (i.e., if the clamped result is < 1 set
max_workers = 1); assign that final value to max_workers when max_workers is
None.


with ThreadPoolExecutor(max_workers=max_workers) as executor:
_futures = []

Expand Down
Loading