Skip to content

Conversation

exupero
Copy link
Contributor

@exupero exupero commented Oct 7, 2025

What changes are proposed in this pull request?

#6288 performed DB writes on a background thread but showed increasing memory usage as batches were accumulated, so DB writes were moved back to the main thread in #6384. Testing on #6361 showed that limiting the number of tasks in the async queue did not reduce memory usage when computing embeddings with a large model.

This PR takes a different approach to putting DB writes on an async thread by using a subclass of SaveContext that submits saves to an async executor. Profiling shows memory usage remains flat while execution also finishes sooner, even when using only 1 async worker thread.

How is this patch tested? If it is not, please explain why.

Ran a memory profiling script that computes embeddings with a large model, using 500, 1000, and 2000 samples. Using async_executor to write to the DB resulted in increasing memory usage, while using AsyncSaveContext did not and is faster, and using more async workers neither changed memory usage nor total execution time:
visualization(15)

Release Notes

Is this a user-facing change that should be mentioned in the release notes?

  • No. You can skip the rest of this section.
  • Yes. Give a description of this change to be included in the release
    notes for FiftyOne users.

What areas of FiftyOne does this PR affect?

  • App: FiftyOne application changes
  • Build: Build and test infrastructure changes
  • Core: Core fiftyone Python library changes
  • Documentation: FiftyOne documentation changes
  • Other

Summary by CodeRabbit

  • Refactor
    • Per-sample saving during image-model processing now runs via an asynchronous background worker, improving throughput and keeping the UI responsive during large or long-running jobs.
    • Thread-safety added around save operations to prevent corruption under concurrent workloads.
  • Bug Fixes
    • Background save tasks now wait for completion on job exit, surface errors reliably, and automatically retry failed batch components to reduce lost writes and improve reliability.

@exupero exupero requested a review from a team as a code owner October 7, 2025 15:29
Copy link
Contributor

coderabbitai bot commented Oct 7, 2025

Walkthrough

Adds an executor-driven asynchronous batch-write pathway to SaveContext via a new async_writes flag, with DummyExecutor/DummyFuture fallback, per-instance locks and futures tracking; image-model data loaders now enable async_writes when creating the SaveContext.

Changes

Cohort / File(s) Summary of changes
Async batch write support in SaveContext
fiftyone/core/collections.py
Added async_writes boolean to SaveContext.__init__; introduced DummyFuture and DummyExecutor; added per-instance locks (samples_lock, frames_lock, batch_ids_lock, reloading_lock), executor, and futures list; __enter__ activates chosen executor; __exit__ drains/waits on futures and propagates errors; save(self, sample) now uses locks to queue sample/frame ops and batch ids; added _do_save_batch to perform actual batch write and _save_batch to submit work to self.executor. Also added threading and ThreadPoolExecutor usage.
Enable async writes in image model loaders
fiftyone/core/models.py
Updated calls to SaveContext to pass async_writes=True in _apply_image_model_data_loader and _compute_image_embeddings_data_loader, enabling the executor-driven async batch write path. No new separate AsyncSaveContext introduced.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Loader
  participant SaveCtx as SaveContext
  participant Exec as Executor (ThreadPoolExecutor or DummyExecutor)
  participant Worker as _do_save_batch

  Loader->>SaveCtx: with SaveContext(samples, async_writes=True)
  activate SaveCtx
  loop per sample
    Loader->>SaveCtx: ctx.save(sample)
    SaveCtx->>SaveCtx: lock, enqueue ops & batch ids
    alt batch threshold reached
      SaveCtx->>Exec: submit(_do_save_batch) -> Future
      Exec->>Worker: run _do_save_batch()
      Worker-->>Exec: result / exception
      Exec-->>SaveCtx: future completes
    end
  end
  SaveCtx->>SaveCtx: __exit__ waits on collected futures, propagate errors
  deactivate SaveCtx
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Poem

I hop through queues and nibble threads,
Packing batches on my tiny sleds.
Futures tucked in a neat little line,
One worker hums — the writes align.
Hooray, the dataset sleeps just fine. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.14% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title clearly describes the primary change by indicating that SaveContext will now write to the database on a background thread, matching the core async executor functionality introduced in this PR.
Description Check ✅ Passed The PR description includes all required template sections: it details the proposed changes, explains the testing procedure with profiling results, fills out the release notes checkboxes, and specifies the affected areas, aligning fully with the repository’s description template.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch task/ess/async-save-context-FOEPD-2119

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01cd890 and e263d1b.

📒 Files selected for processing (2)
  • fiftyone/core/collections.py (1 hunks)
  • fiftyone/core/models.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
fiftyone/core/models.py (1)
fiftyone/core/collections.py (1)
  • AsyncSaveContext (237-254)
fiftyone/core/collections.py (1)
fiftyone/core/utils.py (1)
  • submit (3224-3227)
🪛 Ruff (0.13.3)
fiftyone/core/collections.py

240-240: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-app
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: lint / eslint
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build

Copy link
Contributor

@brimoor brimoor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes intuitive sense to me that offloading DB writes to a worker thread would be faster! Yay!

I'd like to get this behavior into the public interface:

for sample in dataset.iter_samples(..., autosave=True, **kwargs):
    ...

for sample in dataset.iter_groups(..., autosave=True, **kwargs):
    ...

for sample in dataset.save_context(**kwargs):
    ...

# note that this is imported into the public `fiftyone` namespace
with fo.SaveContext(dataset, **kwargs) as ctx:
    for sample in dataset:
        ..

not just used internally in the fiftyone.core.models module.

Moreover (assuming this is fully tested and stable), given that it is more performant, I think we should consider making it the default behavior across the library.

Today, the **kwargs in the public interface let users configure save batching if desired via batch_size and batching_strategy on a per-call basis. And there are environment variables that allow users to configure the default autosave batching behavior across an entire session:

FIFTYONE_DEFAULT_BATCHER
FIFTYONE_BATCHER_STATIC_SIZE
FIFTYONE_BATCHER_TARGET_SIZE_BYTES
FIFTYONE_BATCHER_TARGET_LATENCY

So, some options for this new behavior:

  1. Always use async writes, because it is strictly better
  2. Make async writes configurable via the above patterns by adding:
kwarg: async_writes=True/False
config: FIFTYONE_SAVE_CONTEXT_ASYNC_WRITES=True/False

In 2, assuming async is strictly better, I'd advocate for the default being async_writes=True.

@exupero
Copy link
Contributor Author

exupero commented Oct 7, 2025

Since async_executor turned out to be a bust, should I remove it? It's no longer used internally. It was released in v1.8.1, so if we decide to remove it, is there a deprecation procedure?

@brimoor
Copy link
Contributor

brimoor commented Oct 8, 2025

Since async_executor turned out to be a bust, should I remove it? It's no longer used internally. It was released in v1.8.1, so if we decide to remove it, is there a deprecation procedure?

I don't think we need a deprecation procedure for removing async_executor() as its highly unlikely that anyone found a direct use case for it.

I'd be in favor of reverting #6288 so that the model inference code in fiftyone.core.models is back to being more standard across each of the _compute_XXX_for_YYY() methods in that module.

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

Will do in a separate PR.

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

See #6391

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
fiftyone/core/collections.py (1)

258-269: Consider reducing lock granularity to avoid blocking producers

Holding the lock during the entire DB write serializes producers with the bulk write, which can reduce throughput. If needed later, snapshot and swap the op lists under the lock and perform bulk_write on the snapshot outside the lock to maximize concurrency (while keeping correctness).

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ab57db and 6b06585.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (2 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
fiftyone/core/collections.py

241-241: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test / test-app
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build
🔇 Additional comments (2)
fiftyone/core/collections.py (2)

20-20: Import looks correct

Needed for the new lock usage.


238-246: Good fix for prior race; lock placement is appropriate

Using a single lock around save() and the flush path (calling the base _save_batch) prevents dropped ops discussed in the earlier review thread. Please confirm any callers pass an executor that supports context management, since this class enters/exits it.

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

Run time wasn't any better when using one lock, so switched to 4 locks, which does see a performance improvement:
visualization(16)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6b06585 and 55eaa13.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
fiftyone/core/collections.py (2)
fiftyone/core/odm/database.py (1)
  • bulk_write (945-982)
fiftyone/core/utils.py (1)
  • submit (3224-3227)
🪛 Ruff (0.13.3)
fiftyone/core/collections.py

241-241: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

I'd like to get this behavior into the public interface...not just used internally in the fiftyone.core.models module.

Moreover (assuming this is fully tested and stable), given that it is more performant, I think we should consider making it the default behavior across the library.

So, some options for this new behavior:
1. Always use async writes, because it is strictly better
2. Make async writes configurable via the above patterns by adding:

In 2, assuming async is strictly better, I'd advocate for the default being async_writes=True.

@brimoor captured your full comment in FOEPD-2242 so we don't lose it while this experimental approach is battle-tested via internal usage, but it's out of scope for this PR.

@brimoor
Copy link
Contributor

brimoor commented Oct 8, 2025

Using async writes in apply_model() is not really "internal" usage; its a core workflow that a majority of FO users are likely using. So the confidence required to ship this PR and the confidence required to implement FOEPD-2242 are one in the same from my seat.

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

Noted, I'll keep validating it before merging but will let @jleven and @emfrouin decide on priority of making its usage global.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
fiftyone/core/collections.py (2)

322-366: Backpressure vs memory: current copy‑and‑clear releases locks during I/O

_do_save_batch() copies and clears under locks, then performs bulk_write() and parent sync without locks. This removes intentional backpressure and can increase peak memory while a flush is in progress. Profiling notes indicated holding locks across bulk_write()/_sync_source() kept memory flat.

If the goal is bounded memory, consider holding the locks across I/O:

-        if self._sample_ops:
-            with self.samples_lock:
-                sample_ops = self._sample_ops.copy()
-                self._sample_ops.clear()
-            res = foo.bulk_write(
-                sample_ops,
-                self._sample_coll,
-                ordered=False,
-                batcher=False,
-            )[0]
+        if self._sample_ops:
+            with self.samples_lock:
+                res = foo.bulk_write(
+                    self._sample_ops,
+                    self._sample_coll,
+                    ordered=False,
+                    batcher=False,
+                )[0]
+                self._sample_ops.clear()
             encoded_size += res.bulk_api_result.get("nBytes", 0)
 
-        if self._frame_ops:
-            with self.frames_lock:
-                frame_ops = self._frame_ops.copy()
-                self._frame_ops.clear()
-            res = foo.bulk_write(
-                frame_ops,
-                self._frame_coll,
-                ordered=False,
-                batcher=False,
-            )[0]
+        if self._frame_ops:
+            with self.frames_lock:
+                res = foo.bulk_write(
+                    self._frame_ops,
+                    self._frame_coll,
+                    ordered=False,
+                    batcher=False,
+                )[0]
+                self._frame_ops.clear()
             encoded_size += res.bulk_api_result.get("nBytes", 0)
 
-        if self._batch_ids and self._is_generated:
-            with self.batch_ids_lock:
-                batch_ids = self._batch_ids.copy()
-                self._batch_ids.clear()
-            self.sample_collection._sync_source(ids=batch_ids)
+        if self._batch_ids and self._is_generated:
+            with self.batch_ids_lock:
+                # hold lock to bound growth while syncing
+                self.sample_collection._sync_source(ids=self._batch_ids)
+                self._batch_ids.clear()
 
-        if self._reload_parents:
-            with self.reloading_lock:
-                reload_parents = self._reload_parents.copy()
-                self._reload_parents.clear()
-            for sample in reload_parents:
-                sample._reload_parents()
+        if self._reload_parents:
+            with self.reloading_lock:
+                for sample in self._reload_parents:
+                    sample._reload_parents()
+                self._reload_parents.clear()

If you prefer the unlocked I/O for throughput, add a brief comment explaining the trade‑off and verify profiling still shows flat memory. Based on learnings


368-370: Bound self.futures growth

During long runs, the list can grow with completed futures until exit. Opportunistically prune done futures before appending new ones.

     def _save_batch(self):
-        future = self.executor.submit(self._do_save_batch)
-        self.futures.append(future)
+        # drop completed futures to bound list size
+        self.futures = [f for f in self.futures if not f.done()]
+        future = self.executor.submit(self._do_save_batch)
+        self.futures.append(future)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 55eaa13 and 1fd7aaf.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-08T17:43:55.970Z
Learnt from: exupero
PR: voxel51/fiftyone#6389
File: fiftyone/core/collections.py:323-361
Timestamp: 2025-10-08T17:43:55.970Z
Learning: In fiftyone/core/collections.py, AsyncSaveContext intentionally holds per-category locks (samples_lock, frames_lock, batch_ids_lock, reloading_lock) across bulk_write() and _sync_source() to provide backpressure and keep memory usage flat. Swapping buffers to release locks during I/O was considered but increased peak memory in profiling; current design improved overall runtime while bounding memory.

Applied to files:

  • fiftyone/core/collections.py
🧬 Code graph analysis (1)
fiftyone/core/collections.py (3)
fiftyone/core/frame.py (6)
  • save (490-492)
  • save (1047-1049)
  • save (1097-1104)
  • _in_db (125-126)
  • _save (494-509)
  • _reload_parents (1051-1056)
fiftyone/core/odm/database.py (2)
  • save (89-92)
  • bulk_write (945-982)
fiftyone/core/sample.py (5)
  • save (550-552)
  • save (741-749)
  • _save (554-570)
  • _save (751-762)
  • SampleView (637-762)
🪛 Ruff (0.13.3)
fiftyone/core/collections.py

241-241: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build

@exupero
Copy link
Contributor Author

exupero commented Oct 8, 2025

With latest changes:

SamplesSaveContextAsyncSaveContext% of original time
500180 s105 s58%
1000335 s215 s64%
2000630 s420 s67%

I'm open to suggestions on other tests to run to better validate these changes.

visualization(17)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
fiftyone/core/collections.py (4)

271-329: Clarify concurrency expectations for batching counters

_curr_batch_size, _curr_batch_size_bytes, and _last_time are updated without locks. If save() can be called from multiple producer threads, increments may race and skew flush thresholds. Either document single-producer usage or guard these fields with a lightweight lock.


356-360: Reduce timing sensitivity computing _encoding_ratio

Snapshot _curr_batch_size_bytes before writes to avoid races with concurrent producers.

Apply:

-        self._encoding_ratio = (
-            self._curr_batch_size_bytes / encoded_size
+        bytes_snapshot = self._curr_batch_size_bytes
+        self._encoding_ratio = (
+            bytes_snapshot / encoded_size
-            if encoded_size > 0 and self._curr_batch_size_bytes
+            if encoded_size > 0 and bytes_snapshot
             else 1.0
         )

362-374: Guard batch_ids/reload_parents checks under locks

Move truthiness checks inside the locks to avoid racy empty snapshots and unnecessary calls.

Apply:

-        if self._batch_ids and self._is_generated:
-            with self.batch_ids_lock:
-                batch_ids = self._batch_ids.copy()
-                self._batch_ids.clear()
-            self.sample_collection._sync_source(ids=batch_ids)
+        if self._is_generated:
+            with self.batch_ids_lock:
+                batch_ids, self._batch_ids = self._batch_ids, []
+            if batch_ids:
+                self.sample_collection._sync_source(ids=batch_ids)
@@
-        if self._reload_parents:
-            with self.reloading_lock:
-                reload_parents = self._reload_parents.copy()
-                self._reload_parents.clear()
-            for sample in reload_parents:
-                sample._reload_parents()
+        with self.reloading_lock:
+            reload_parents, self._reload_parents = self._reload_parents, []
+        if reload_parents:
+            for sample in reload_parents:
+                sample._reload_parents()

375-378: Make futures list thread-safe (if multiple producers can call _save_batch)

If save() can run from multiple threads, appending to self.futures can race with exit drains. Use a small lock.

Apply:

-        future = self.executor.submit(self._do_save_batch)
-        self.futures.append(future)
+        future = self.executor.submit(self._do_save_batch)
+        # optional: protect if multiple producers exist
+        # (add `self.futures_lock = threading.Lock()` in __init__)
+        try:
+            lock = self.futures_lock
+        except AttributeError:
+            self.futures.append(future)
+        else:
+            with lock:
+                self.futures.append(future)

And in init:

         self.futures = []
+        self.futures_lock = threading.Lock()
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1fd7aaf and 5e824f6.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
fiftyone/core/collections.py (3)
fiftyone/core/frame.py (5)
  • save (490-492)
  • save (1047-1049)
  • save (1097-1104)
  • _in_db (125-126)
  • _save (494-509)
fiftyone/core/odm/database.py (2)
  • save (89-92)
  • bulk_write (945-982)
fiftyone/core/utils.py (1)
  • submit (3224-3227)
🪛 Ruff (0.13.3)
fiftyone/core/collections.py

241-241: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test / test-app
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: build / build
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build
🔇 Additional comments (1)
fiftyone/core/collections.py (1)

20-20: LGTM: required for locks

Importing threading is appropriate for the new lock usage.

@kaixi-wang
Copy link
Contributor

Findings of more workers not improving time align with previous performance investigations and understanding of pymongo.

Echoing Brian's comments from earlier, it feels like the save context should just be an async save context when save(_deferred=True).

I'm wondering about the reload_parents() and both how much that could be contributing to the overall time and if reloading once upon exit after all ops are done could help if reload is even necessary.

Also, have you tested with the different batching strategies? There's not anything I'm concerned about off the top of my head, but it would be good to just test and not be surprised about later

@exupero
Copy link
Contributor Author

exupero commented Oct 10, 2025

Tested different batching strategies. The tests so far used the latency batcher. size shows very similar behavior. static consumes more memory, but its consumption doesn't appear to be unbounded:
visualization(18)

sample: a :class:`fiftyone.core.sample.Sample` or
:class:`fiftyone.core.sample.SampleView`
"""
if sample._in_db and sample._dataset is not self._dataset:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is all of this just copied from SaveContext but with the locks? Feels like a lot of shared logic/similar code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. What are your thoughts on resolving that? Put the locks in SaveContext?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the locks in SaveContext?

Yes that seems like like the best path. Both to reduce code duplication and also because per my comments here, I see this either as (1) the way that SaveContext always works, or (2) a behavior I can get via fo.SaveContext(..., async_writes=True) and similar.

Note that I am making some small tweaks to SaveContext in #4773, so it would be great to get that merged first 😄

^reminder about this as well. More reason not to duplicate implementation of SaveContext and to get that PR merged first

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
fiftyone/core/collections.py (1)

338-363: Fix race + prevent data loss in _do_save_batch (drain under locks, write only when non-empty, requeue on failure)

Currently checks outside locks and unconditional bulk_write can cause bulk_write([])[0] and drop ops if I/O fails. Drain by swap under locks, then write on the snapshot; on exception, requeue to preserve data. Also compute encoding_ratio from a snapshot and drain ids/reloads after locks.

Apply:

     def _do_save_batch(self):
-        encoded_size = -1
-        if self._sample_ops:
-            with self.samples_lock:
-                sample_ops = self._sample_ops.copy()
-                self._sample_ops.clear()
-            res = foo.bulk_write(
-                sample_ops,
-                self._sample_coll,
-                ordered=False,
-                batcher=False,
-            )[0]
-            encoded_size += res.bulk_api_result.get("nBytes", 0)
+        encoded_size = -1
+        # drain samples under lock
+        with self.samples_lock:
+            sample_ops, self._sample_ops = self._sample_ops, []
+        if sample_ops:
+            try:
+                res = foo.bulk_write(
+                    sample_ops,
+                    self._sample_coll,
+                    ordered=False,
+                    batcher=False,
+                )[0]
+                encoded_size += res.bulk_api_result.get("nBytes", 0)
+            except Exception:
+                # requeue to avoid data loss
+                with self.samples_lock:
+                    self._sample_ops[:0] = sample_ops
+                raise
 
-        if self._frame_ops:
-            with self.frames_lock:
-                frame_ops = self._frame_ops.copy()
-                self._frame_ops.clear()
-            res = foo.bulk_write(
-                frame_ops,
-                self._frame_coll,
-                ordered=False,
-                batcher=False,
-            )[0]
-            encoded_size += res.bulk_api_result.get("nBytes", 0)
+        # drain frames under lock
+        with self.frames_lock:
+            frame_ops, self._frame_ops = self._frame_ops, []
+        if frame_ops:
+            try:
+                res = foo.bulk_write(
+                    frame_ops,
+                    self._frame_coll,
+                    ordered=False,
+                    batcher=False,
+                )[0]
+                encoded_size += res.bulk_api_result.get("nBytes", 0)
+            except Exception:
+                with self.frames_lock:
+                    self._frame_ops[:0] = frame_ops
+                raise
 
-        self._encoding_ratio = (
-            self._curr_batch_size_bytes / encoded_size
+        # snapshot to reduce timing sensitivity
+        bytes_snapshot = self._curr_batch_size_bytes
+        self._encoding_ratio = (
+            bytes_snapshot / encoded_size
             if encoded_size > 0 and self._curr_batch_size_bytes
             else 1.0
         )
 
-        if self._batch_ids and self._is_generated:
-            with self.batch_ids_lock:
-                batch_ids = self._batch_ids.copy()
-                self._batch_ids.clear()
-            self.sample_collection._sync_source(ids=batch_ids)
+        if self._is_generated:
+            with self.batch_ids_lock:
+                batch_ids, self._batch_ids = self._batch_ids, []
+            if batch_ids:
+                self.sample_collection._sync_source(ids=batch_ids)
 
-        if self._reload_parents:
-            with self.reloading_lock:
-                reload_parents = self._reload_parents.copy()
-                self._reload_parents.clear()
-            for sample in reload_parents:
-                sample._reload_parents()
+        with self.reloading_lock:
+            reload_parents, self._reload_parents = self._reload_parents, []
+        for sample in reload_parents:
+            sample._reload_parents()

Based on learnings

Also applies to: 370-382

🧹 Nitpick comments (3)
fiftyone/core/collections.py (3)

256-278: Avoid masking exceptions from the with-body in exit

If the with-body raised, re-raising a Future error here will mask it. Gate the re-raise on no prior exception.

     def __exit__(self, *args):
         super().__exit__(*args)
 
         error = None
         try:
@@
         finally:
             self.executor.__exit__(*args)
 
-        if error:
-            raise error
+        # Only raise background error if the with-body didn't raise
+        if error and (not args or args[0] is None):
+            raise error

279-337: Minor: reduce duplication with SaveContext.save()

Large overlap with SaveContext.save; consider extracting shared batching/threshold logic into a helper to prevent drift.


383-386: Record submitted futures safely (optional)

If save() may be called from multiple threads, consider guarding futures append/drain with a small lock to avoid races with exit’s drain.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e824f6 and 9a11f94.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (2 hunks)
🔇 Additional comments (2)
fiftyone/core/collections.py (2)

20-20: Import looks good

threading is required for the added locks.


238-249: Constructor and lock setup look good; confirm executor concurrency assumptions

Requiring an executor and adding per-list locks is sound. If this context may be used with executors having >1 worker, ensure _do_save_batch is race-safe (see my fix below) or constrain max_workers=1 at call sites.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9a11f94 and 65afd86.

📒 Files selected for processing (2)
  • fiftyone/core/collections.py (9 hunks)
  • fiftyone/core/models.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • fiftyone/core/models.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.