Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2ae16c8
Add AsyncSaveContext that saves batches in background thread
exupero Oct 7, 2025
b77fda7
Capture futures and join on them before exiting async context manager
exupero Oct 7, 2025
1534133
Lock save and _save_batch execution to avoid dropping samples
exupero Oct 8, 2025
dd407ef
Wait on final batch save from __exit__ and clear list of futures
exupero Oct 8, 2025
021fbb5
Make locks more fine-grained to restore performance improvement
exupero Oct 8, 2025
fd2d307
Copy data and clear inside lock, don't lock slow I/O
exupero Oct 8, 2025
a68ba84
Loop-drain futures on AsyncSaveContext#__exit__()
exupero Oct 8, 2025
6a7c668
Ensure executor is exited
exupero Oct 8, 2025
6acf531
Drain all futures rather short-circuiting on error
exupero Oct 13, 2025
a451178
Don't mask error from 'with' body
exupero Oct 13, 2025
f9389c6
Merge AsyncSaveContext logic into SaveContext, behind async_writes=Tr…
exupero Oct 14, 2025
ed897c4
Fix logic to throw error when 'with' body didn't
exupero Oct 14, 2025
c5de0cc
Add 'async_writes' option to docstring
exupero Oct 23, 2025
bbf17de
Denote DummyFuture and DummyExecutor are private with leading _
exupero Oct 23, 2025
9a1ef56
Remove unused exception var
exupero Dec 8, 2025
2248a13
Fix comment
exupero Dec 8, 2025
39c6da7
Add comment about not using more than one background thread
exupero Dec 9, 2025
61b2028
Generalize comment
exupero Dec 12, 2025
544d845
Make dummy future fail synchronously
exupero Dec 12, 2025
21f2a91
Don't re-queue failed ops to avoid race conditions
exupero Dec 18, 2025
df54fa5
Clarify comment
exupero Dec 22, 2025
2715936
Simplify comment so it doesn't assume how SaveContext is used
exupero Dec 22, 2025
455ca3b
Clean out finished futures whenever a batch is saved
exupero Jan 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 103 additions & 13 deletions fiftyone/core/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from operator import itemgetter
Expand All @@ -18,6 +19,7 @@
from packaging.version import Version
import random
import string
import threading
import timeit
import warnings

Expand Down Expand Up @@ -79,6 +81,28 @@ def registrar(func):
aggregation = _make_registrar()


class _DummyFuture:
def __init__(self, *, value=None):
self.value = value

def result(self):
return self.value

def done(self):
return True


class _DummyExecutor:
def __enter__(self):
return self

def __exit__(self, *args):
pass

def submit(self, fn, *args, **kwargs):
return _DummyFuture(value=fn(*args, **kwargs))

Comment thread
coderabbitai[bot] marked this conversation as resolved.

class SaveContext(object):
"""Context that saves samples from a collection according to a configurable
batching strategy.
Expand All @@ -101,13 +125,16 @@ class SaveContext(object):
- ``"latency"``: a target latency, in seconds, between saves

By default, ``fo.config.default_batcher`` is used
async_writes (False): whether to perform batch writes asynchronously in
a background thread
"""

def __init__(
self,
sample_collection,
batch_size=None,
batching_strategy=None,
async_writes=False,
Comment thread
exupero marked this conversation as resolved.
):
batch_size, batching_strategy = fou.parse_batching_strategy(
batch_size=batch_size, batching_strategy=batching_strategy
Expand All @@ -132,6 +159,19 @@ def __init__(
self._encoding_ratio = 1.0
self._last_time = None

self.samples_lock = threading.Lock()
self.frames_lock = threading.Lock()
self.batch_ids_lock = threading.Lock()
self.reloading_lock = threading.Lock()

self.executor = (
# Using more than one worker will introduce race conditions in the state preserved between DB writes
ThreadPoolExecutor(max_workers=1)
if async_writes
else _DummyExecutor()
)
self.futures = []

def __enter__(self):
if self._batching_strategy == "static":
self._curr_batch_size = 0
Expand All @@ -140,11 +180,32 @@ def __enter__(self):
elif self._batching_strategy == "latency":
self._last_time = timeit.default_timer()

self.executor.__enter__()
return self

def __exit__(self, *args):
self._save_batch()

error = None
try:
# Loop-drain self.futures so any submissions triggered by
# self._save_batch() are awaited.
while self.futures:
futures = self.futures
self.futures = []
for future in futures:
try:
future.result()
except Exception as e:
if error is None:
Comment thread
exupero marked this conversation as resolved.
error = e
self.futures.clear()
finally:
self.executor.__exit__(*args)

if error and (not args or args[0] is None):
raise error

Comment thread
exupero marked this conversation as resolved.
def save(self, sample):
"""Registers the sample for saving in the next batch.

Expand All @@ -162,16 +223,20 @@ def save(self, sample):
updated = sample_ops or frame_ops

if sample_ops:
self._sample_ops.extend(sample_ops)
with self.samples_lock:
self._sample_ops.extend(sample_ops)

if frame_ops:
self._frame_ops.extend(frame_ops)
with self.frames_lock:
self._frame_ops.extend(frame_ops)

if updated and self._is_generated:
self._batch_ids.append(sample.id)
with self.batch_ids_lock:
self._batch_ids.append(sample.id)

if updated and isinstance(sample, fosa.SampleView):
self._reload_parents.append(sample)
with self.reloading_lock:
self._reload_parents.append(sample)

if self._batching_strategy == "static":
self._curr_batch_size += 1
Expand Down Expand Up @@ -200,24 +265,31 @@ def save(self, sample):
self._save_batch()
self._last_time = timeit.default_timer()

def _save_batch(self):
def _do_save_batch(self):
encoded_size = -1
if self._sample_ops:
with self.samples_lock:
sample_ops = self._sample_ops.copy()
self._sample_ops.clear()
res = foo.bulk_write(
self._sample_ops,
sample_ops,
self._sample_coll,
ordered=False,
batcher=False,
)[0]
encoded_size += res.bulk_api_result.get("nBytes", 0)
self._sample_ops.clear()

if self._frame_ops:
with self.frames_lock:
frame_ops = self._frame_ops.copy()
self._frame_ops.clear()
res = foo.bulk_write(
self._frame_ops, self._frame_coll, ordered=False, batcher=False
frame_ops,
self._frame_coll,
ordered=False,
batcher=False,
)[0]
encoded_size += res.bulk_api_result.get("nBytes", 0)
self._frame_ops.clear()

Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._encoding_ratio = (
self._curr_batch_size_bytes / encoded_size
Expand All @@ -226,14 +298,32 @@ def _save_batch(self):
)

if self._batch_ids and self._is_generated:
self.sample_collection._sync_source(ids=self._batch_ids)
self._batch_ids.clear()
with self.batch_ids_lock:
batch_ids = self._batch_ids.copy()
self._batch_ids.clear()
self.sample_collection._sync_source(ids=batch_ids)

if self._reload_parents:
for sample in self._reload_parents:
with self.reloading_lock:
reload_parents = self._reload_parents.copy()
self._reload_parents.clear()
for sample in reload_parents:
sample._reload_parents()

self._reload_parents.clear()
def _save_batch(self):
pending = []
for future in self.futures:
if not future.done():
pending.append(future)
else:
try:
future.result()
except Exception:
pending.append(future) # re-raise in __exit__
self.futures = pending

future = self.executor.submit(self._do_save_batch)
self.futures.append(future)


class SampleCollection(object):
Expand Down
14 changes: 12 additions & 2 deletions fiftyone/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,12 @@ def _apply_image_model_data_loader(

with contextlib.ExitStack() as context:
pb = context.enter_context(fou.ProgressBar(samples, progress=progress))
ctx = context.enter_context(foc.SaveContext(samples))
ctx = context.enter_context(
foc.SaveContext(
samples,
async_writes=True,
)
)

for sample_batch, imgs in zip(
fou.iter_batches(samples, batch_size),
Expand Down Expand Up @@ -1207,7 +1212,12 @@ def _compute_image_embeddings_data_loader(
with contextlib.ExitStack() as context:
pb = context.enter_context(fou.ProgressBar(samples, progress=progress))
if embeddings_field is not None:
ctx = context.enter_context(foc.SaveContext(samples))
ctx = context.enter_context(
foc.SaveContext(
samples,
async_writes=True,
)
)
else:
ctx = None

Expand Down
Loading