-
Notifications
You must be signed in to change notification settings - Fork 677
make num writer threads configurable #6343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughAdds an optional num_writer_workers parameter to apply_model and compute_embeddings, threads it through to internal data loaders to configure async writer executor max_workers, and makes async_executor accept max_workers=None and derive a default from configuration or a CPU-based recommendation. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor U as User
participant API as apply_model(...)
participant DL as _apply_image_model_data_loader
participant M as Model
participant AE as async_executor (writer)
U->>API: apply_model(samples, model, num_writer_workers=?)
API->>DL: start processing (passes num_writer_workers)
DL->>AE: create writer executor\nmax_workers = num_writer_workers or default
loop For each batch
DL->>M: infer on batch
DL->>AE: submit async label/save
end
AE-->>DL: completes tasks
DL-->>API: done
API-->>U: return
note over AE: If None, max_workers derived from config or recommendation
sequenceDiagram
autonumber
actor U as User
participant API as compute_embeddings(...)
participant DL as _compute_image_embeddings_data_loader
participant M as Model
participant AE as async_executor (writer)
U->>API: compute_embeddings(samples, model, num_writer_workers=?)
API->>DL: start processing (passes num_writer_workers)
DL->>AE: create writer executor\nmax_workers = num_writer_workers or default
loop For each batch
DL->>M: compute embeddings
DL->>AE: submit async write
end
AE-->>DL: completes writes
DL-->>API: embeddings ready
API-->>U: return
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
@PeterOttoMoeller This may be a relevant argument for the embeddings rework. If the throughput isn't rising with more threads, we have a different bottleneck somewhere in the stack. It could very well may be mongo write throughput. Not urgent, but worth investigating IMO. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
fiftyone/core/models.py (1)
78-84
: Add docs for num_writer_thread_workers and warn when ignoredThe new parameter is missing from both docstrings. Also consider warning when it’s provided but a DataLoader path isn’t used (same pattern as
num_workers
).Additional docstring lines to add under Args in both functions:
num_writer_thread_workers (None): number of background writer threads used to save results when using DataLoader-backed execution. If None, defaults to fo.config.default_thread_pool_workers clamped by fo.config.max_thread_pool_workers. Ignored when a DataLoader is not used (e.g., video-frame paths or non-Torch models).
Optional warning (near the existing
num_workers
warning):if num_writer_thread_workers is not None and not use_data_loader: logger.warning("Ignoring unsupported `num_writer_thread_workers` parameter")Also applies to: 904-914
🧹 Nitpick comments (2)
fiftyone/core/models.py (2)
462-463
: Throttle in-flight async writes to cap memory growthSubmitting one future per batch without bounding the in-flight queue can retain large batches/labels in memory if compute outpaces writes. Introduce a small bounded window tied to writer threads to add backpressure.
def _apply_image_model_data_loader( @@ - submit = context.enter_context( + submit = context.enter_context( fou.async_executor( max_workers=num_writer_thread_workers, skip_failures=skip_failures, warning="Async failure labeling batches", ) ) + # Backpressure window: ~2x the writer parallelism + from collections import deque + _pending = deque() + _window = (num_writer_thread_workers or fou.recommend_thread_pool_workers()) * 2 @@ - submit(save_batch, sample_batch, labels_batch) + fut = submit(save_batch, sample_batch, labels_batch) + _pending.append(fut) + if len(_pending) >= _window: + # Drain oldest to keep memory bounded + _pending.popleft().result()Note:
async_executor
will still join remaining futures on exit; callingresult()
early here just helps bound peak memory. Consider exposing a first-classmax_pending
inasync_executor
in a follow-up for reuse across sites.Also applies to: 485-490, 505-523
1208-1210
: Apply the same bounded in-flight writes for embeddings savesMirror the backpressure window to keep embeddings batches from piling up in memory when saving to the DB.
def _compute_image_embeddings_data_loader( @@ - submit = context.enter_context( + submit = context.enter_context( fou.async_executor( max_workers=num_writer_thread_workers, skip_failures=skip_failures, warning="Async failure saving embeddings", ) ) + from collections import deque + _pending = deque() + _window = (num_writer_thread_workers or fou.recommend_thread_pool_workers()) * 2 @@ - if embeddings_field is not None: - submit(save_batch, sample_batch, embeddings_batch) + if embeddings_field is not None: + fut = submit(save_batch, sample_batch, embeddings_batch) + _pending.append(fut) + if len(_pending) >= _window: + _pending.popleft().result() else: embeddings.extend(embeddings_batch)Also applies to: 1232-1237, 1246-1274
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
fiftyone/core/models.py
(8 hunks)fiftyone/core/utils.py
(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-05-06T01:47:36.969Z
Learnt from: minhtuev
PR: voxel51/fiftyone#5807
File: fiftyone/utils/eval/detection.py:261-269
Timestamp: 2025-05-06T01:47:36.969Z
Learning: The SampleCollection.map_samples() method in FiftyOne uses the parameter name `num_workers` (not `workers`) to specify the number of workers for parallel processing.
Applied to files:
fiftyone/core/models.py
📚 Learning: 2025-05-06T01:47:36.969Z
Learnt from: minhtuev
PR: voxel51/fiftyone#5807
File: fiftyone/utils/eval/detection.py:261-269
Timestamp: 2025-05-06T01:47:36.969Z
Learning: The SampleCollection.map_samples() method in FiftyOne takes `num_workers` as a parameter name, not `workers`, for specifying the number of workers for parallel processing.
Applied to files:
fiftyone/core/models.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: test-windows / test-python (windows-latest, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.12)
- GitHub Check: test-windows / test-python (windows-latest, 3.10)
- GitHub Check: lint / eslint
- GitHub Check: e2e / test-e2e
- GitHub Check: build / build
- GitHub Check: test / test-app
- GitHub Check: build
🔇 Additional comments (1)
fiftyone/core/utils.py (1)
3150-3151
: Preserve positional max_workers for back-compat — accept a single positional intRepo search found no internal positional calls, but this is a public API break. Implement the suggested shim in fiftyone/core/utils.py (accept *args; coerce a single positional int to max_workers; raise on multiple/duplicate), add a unit test that calls async_executor(4) and verifies behavior, and add a changelog/doc note documenting the accepted positional argument.
if max_workers is None: | ||
max_workers = ( | ||
fo.config.default_thread_pool_workers | ||
or recommend_thread_pool_workers(max_workers) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Clamp defaults to config max and guard against zero workers
When fo.config.default_thread_pool_workers
is set, the current logic bypasses recommend_thread_pool_workers(...)
, so it may exceed fo.config.max_thread_pool_workers
. It also allows 0
, which will crash ThreadPoolExecutor
. Clamp via recommend_thread_pool_workers
and enforce a minimum of 1.
- if max_workers is None:
- max_workers = (
- fo.config.default_thread_pool_workers
- or recommend_thread_pool_workers(max_workers)
- )
+ if max_workers is None:
+ cfg_default = getattr(fo.config, "default_thread_pool_workers", None)
+ # Clamp to fo.config.max_thread_pool_workers inside helper
+ max_workers = recommend_thread_pool_workers(cfg_default)
+ if max_workers is None or max_workers < 1:
+ # ThreadPoolExecutor rejects 0; ensure a sensible minimum
+ max_workers = 1
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if max_workers is None: | |
max_workers = ( | |
fo.config.default_thread_pool_workers | |
or recommend_thread_pool_workers(max_workers) | |
) | |
if max_workers is None: | |
cfg_default = getattr(fo.config, "default_thread_pool_workers", None) | |
# Clamp to fo.config.max_thread_pool_workers inside helper | |
max_workers = recommend_thread_pool_workers(cfg_default) | |
if max_workers is None or max_workers < 1: | |
# ThreadPoolExecutor rejects 0; ensure a sensible minimum | |
max_workers = 1 |
🤖 Prompt for AI Agents
In fiftyone/core/utils.py around lines 3171-3175, the current default-selection
bypasses recommend_thread_pool_workers and can exceed
fo.config.max_thread_pool_workers or be zero; change the logic to compute a
tentative default (use fo.config.default_thread_pool_workers if set else
recommend_thread_pool_workers(max_workers)), then pass that tentative value
through recommend_thread_pool_workers to clamp it within configured bounds, and
finally enforce a minimum of 1 (i.e., if the clamped result is < 1 set
max_workers = 1); assign that final value to max_workers when max_workers is
None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud: should we document this parameter in the apply_model()
and compute_embeddings()
implementations of the main SampleCollection
interface? Or do we consider this a strictly internal thing that we don't want end users to think about?
Observation 1: this parameter is only applicable when model
is a Torch model that supports data loaders. Ideally all parameters in the public interface support all models, which is one piece of evidence to suggest that this is perhaps not worthy of inclusion in the SampleCollection
interface.
Observation 2: the SampleCollection
methods have **kwargs
in their signatures, so it would still be possible to utilize this functionality, albeit undocumented.
What say you? 🤔
""" | ||
if max_workers is None: | ||
max_workers = ( | ||
fo.config.default_thread_pool_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the data, it looks like there is no benefit after 8 (most the benefits are seen with 4) so it seems like we should cap it rather than use the default (cpu_count()). For high cpu machines, this could have deleterious affects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data attached is for computing dense embeddings with a (relatively) large batch size. It's of interest because we will use this workload in det RERs. It's also likely reflective of other very heavy mongo I/O workloads. Conversely, something like a regular apply_model
on detections with a low batch size will have many small payloads and may benefit from more threads.
Given that the user can already configure the number of threads via any of:
- the argument
num_writer_workers
fo.config.default_thread_pool_workers
fo.config.max_thread_pool_workers
I would rather not put a hard cap here.
What concerns do you have for very many threads being used? Maybe they can be separately adressed.
@brimoor Agree that this shouldn't be public facing. The main reason (besides the points you mentioned) being that I think most users don't and shouldn't care about these details. It should "just work". This also strengthens the argument for better memory management in this script IMO. Semi-related: continuing to maintain multiple inference loops is a mistake. With the |
@jacobsela happy to chat sometime about how a single inference loop would look. Note that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
fiftyone/core/utils.py (1)
3168-3170
: Docstring misstates defaulting pathBehavior prefers fo.config.default_thread_pool_workers when set. Update docstring to reflect that, or change code to always defer to recommend_thread_pool_workers.
Suggested tweak:
- max_workers (None): the maximum number of workers to use. By default, - this is determined by :func:`fiftyone.core.utils.recommend_thread_pool_workers`. + max_workers (None): the maximum number of workers to use. By default, + this is chosen via + :func:`fiftyone.core.utils.recommend_thread_pool_workers`, + seeded by ``fo.config.default_thread_pool_workers`` when set.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
fiftyone/core/utils.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
fiftyone/core/utils.py (1)
fiftyone/core/map/mapper.py (1)
num_workers
(105-107)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: lint / eslint
- GitHub Check: test-windows / test-python (windows-latest, 3.10)
- GitHub Check: test-windows / test-python (windows-latest, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.12)
- GitHub Check: test-windows / test-python (windows-latest, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
- GitHub Check: test / test-app
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: build
🔇 Additional comments (2)
fiftyone/core/utils.py (2)
3177-3182
: Clamp to config max and prevent zero-worker executors (duplicate of prior feedback)This still bypasses recommend_thread_pool_workers when fo.config.default_thread_pool_workers is set, so values can exceed fo.config.max_thread_pool_workers or be 0, which raises ValueError in ThreadPoolExecutor. Clamp via the helper and enforce a floor of 1.
Apply:
- if max_workers is None: - max_workers = ( - fo.config.default_thread_pool_workers - or recommend_thread_pool_workers(max_workers) - ) + if max_workers is None: + cfg_default = getattr(fo.config, "default_thread_pool_workers", None) + # Clamp within configured bounds and derive a sane default + max_workers = recommend_thread_pool_workers(cfg_default) + if max_workers is None or max_workers < 1: + # ThreadPoolExecutor rejects 0; ensure a sensible minimum + max_workers = 1Nit: call recommend_thread_pool_workers() with no arg when you mean “None” for readability.
2862-2864
: Guarantee floor after applying fo.config.max_thread_pool_workers + fix docstringThe doc says “<= 0 returns 1”, but after applying the max cap, a misconfigured cap of 0 yields 0. Ensure a final floor and document it.
Apply:
@@ - num_workers (None): a suggested number of workers. If ``num_workers <= 0``, this - function returns 1. + num_workers (None): a suggested number of workers. Values ``<= 0`` are + treated as ``1``. The result is clamped by + ``fo.config.max_thread_pool_workers`` and floored at ``1``. @@ if fo.config.max_thread_pool_workers is not None: num_workers = min(num_workers, fo.config.max_thread_pool_workers) - return num_workers + if num_workers < 1: + num_workers = 1 + return num_workersAlso applies to: 2872-2874, 2875-2878
@contextmanager | ||
def async_executor( | ||
*, max_workers, skip_failures=False, warning="Async failure" | ||
*, max_workers=None, skip_failures=False, warning="Async failure" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Verification inconclusive
Keyword‑only args may break existing positional calls; keep first arg positional or verify all call sites
Switching to keyword‑only can break user code calling with async_executor(4)
. Either keep the first parameter positional, or confirm all call sites are keyworded.
Option A (backward compatible):
-@contextmanager
-def async_executor(
- *, max_workers=None, skip_failures=False, warning="Async failure"
-):
+@contextmanager
+def async_executor(
+ max_workers=None, *, skip_failures=False, warning="Async failure"
+):
Option B (keep as-is): verify no positional usages in the repo:
🏁 Script executed:
#!/bin/bash
# Find async_executor calls that don't use keywords for the first arg
rg -nP -C1 '\basync_executor\s*\(\s*(?!\)|max_workers=|skip_failures=|warning=)\S' || true
# Common context manager form
rg -nP -C1 '\bwith\s+async_executor\s*\(\s*(?!\)|max_workers=|skip_failures=|warning=)\S' || true
Length of output: 167
async_executor signature change is a breaking change
No internal calls rely on a positional first argument, but as a public API making max_workers
keyword-only will break external callers. Either revert to a backward-compatible signature:
@contextmanager
-def async_executor(
- *, max_workers=None, skip_failures=False, warning="Async failure"
-):
+def async_executor(
+ max_workers=None, *, skip_failures=False, warning="Async failure"
+):
or keep the keyword-only signature but document the breaking change and bump the major version.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
*, max_workers=None, skip_failures=False, warning="Async failure" | |
@contextmanager | |
def async_executor( | |
max_workers=None, *, skip_failures=False, warning="Async failure" | |
): |
🤖 Prompt for AI Agents
In fiftyone/core/utils.py around line 3155, the change making max_workers a
keyword-only parameter is a breaking public-API change; restore backward
compatibility by accepting max_workers as a positional parameter (move
max_workers before the * or remove the keyword-only marker) so existing callers
continue to work, or if you intend to keep it keyword-only then update the
public docs/changelog to note the breaking change and bump the package major
version accordingly.
@jacobsela the memory characteristics shown in the PR description are on this branch, correct? Can you provide a couple points of comparison: 1) the memory behavior on develop, which uses 1 async thread, and 2) the memory behavior prior to #6288? |
What changes are proposed in this pull request?
Make num writer threads used in
apply_model
andcompute_embeddings
configurable. This has a pretty big effect on runtime (>2.5x in reasonable cases).Also something weird may be happening with memory usage.
How is this patch tested? If it is not, please explain why.
Code and data will be shared privately, I don't want to dump it here.
Performance impact of num threads on I/O heavy jobs
Task: Computing dense embeddings with DINO v3
Variables:
compute_embeddings
benchmark time vs num threadsanalysis
Without a large enough number of threads, the writing jobs just continue getting queued and pile up. As we exit the async execution, we have to do a blocking wait for them to all finish to complete the
compute_embeddings
operation. This effectively means that rather than saving time incompute_embeddings
, we are just deferring the wait for I/O till later.By increasing the number of writer threads, we can avoid this issue. Interestingly, this effect seems to be more pronounced for larger image sizes and model sizes.
I'm not sure why the speed flatlines so quickly with num threads. The writing still continues after the GPU workload ends even for 16 threads. We may be hitting the limits of mongo throughput. Something to consider for an embeddings rework @brimoor .
Memory consumption vs num samples
Task: Computing dense embeddings with DINO v3
Variables:
compute_embeddings
memory usage over timeanalysis
As the writing jobs pile up, we use more and more memory. It is critical that tasks like this don't have memory requirements that scale this quickly with the number of samples. In this case some back pressure and garbage collection will likely do.
Adding more threads should in theory help, because we would clear the job queue faster, thus not allowing it to pile up in the first place. Given that the write throughput scaling we get with threads isn't good, it doesn't resolve the problem. Regardless, we should have the guardrails mentioned above.
action items
We need some limit on the number of embeddings that can be floating in memory at once.
async_executor
Release Notes
Is this a user-facing change that should be mentioned in the release notes?
notes for FiftyOne users.
(Details in 1-2 sentences. You can just refer to another PR with a description
if this PR is part of a larger change.)
What areas of FiftyOne does this PR affect?
fiftyone
Python library changesSummary by CodeRabbit
New Features
Bug Fixes / Behavior
Documentation