Skip to content

Commit 4849a5e

Browse files
authored
Optimize task creation from CS without manifest (#9827)
Related: #9757 When a raw images task is created from a CS without a manifest attached, CVAT downloads image headers to get image resolution. This operation can be quite time-consuming for big tasks, but it can be optimized quite simply. - Improved performance of CS image header downloading and manifest creation ~2-6x The default chunk size used in the downloader is 64KB. For most image formats the required information is available in the first 1KB, while 64KB (the previous value) can be the size of the whole file. It is tempting to change it to a lower value, e.g. 1500 (the default Ethernet v2 MTU size) and it works fine, except for jpgs that include an embedded thumbnail (preview) image in the header, which can basically be of any size. <s>Probably, this can be implemented with a more advanced JPEG parser. It doesn't seem reasonable to use the reduced chunk size and download the whole image for such images, as such jpegs seem to be quite common, but maybe it can be implemented as an exception just for the jpg format.</s> Now, multiple header sizes are attempted per file. AWS connections limits are floating and depend on the data filenames. In the worst case, we can expect about 100 connections per prefix, up to infinite in the best case (random prefixes). It's also possible to get throttled by AWS (e.g. 503 Slow Down), it should be handled by the boto library itself. https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html https://stackoverflow.com/questions/37432285/maximum-no-of-connections-that-can-be-held-by-s3 https://repost.aws/knowledge-center/http-5xx-errors-s3 https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html Details: test dataset: 26822 .jpg images (~10 GB) baseline: 550s with queue: 320 - 350s with reduced chunk size (1 MTU): 220 - 280s with improved connection reuse for AWS: up to 82s (64 connections for 16 cores, up from 10 kept alive with the default config) Sample script for testing ```python from time import perf_counter from tempfile import TemporaryDirectory from tqdm import tqdm from cvat.apps.engine import models, cloud_provider from utils.dataset_manifest.core import ImageManifestManager cloud_storage = models.CloudStorage.objects.get(id=yourcs) storage_client = cloud_provider.db_storage_to_storage_instance(cloud_storage) media = [v["name"] for v in storage_client.list_files(prefix="images/", _use_flat_listing=True)] header_downloader = cloud_provider.HeaderFirstMediaDownloader.create( models.DimensionType.DIM_2D, client=storage_client ) content_generator = ( v for v in tqdm( storage_client.bulk_download_to_memory(media, object_downloader=header_downloader.download), total=len(media), ) ) with TemporaryDirectory() as tempdir: start_time = perf_counter() manifest = ImageManifestManager(tempdir, upload_dir=tempdir, create_index=False) manifest.link( sources=content_generator, stop=len(media) - 1, DIM_3D=False, ) manifest.create() duration = perf_counter() - start_time print( f"Manifest for {len(media)} files created in", duration, "seconds", f"avg. {duration / (len(media) or 1)}s.", ) # run with # cat test_cs_downloading.py | python manage.py shell ```
1 parent 4f5be6c commit 4849a5e

File tree

2 files changed

+106
-42
lines changed

2 files changed

+106
-42
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
### Fixed
2+
3+
- Improved performance of task creation from cloud without manifest
4+
(<https://github.com/cvat-ai/cvat/pull/9827>)

cvat/apps/engine/cloud_provider.py

Lines changed: 102 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
from __future__ import annotations
77

88
import functools
9+
import io
910
import json
1011
import os
1112
from abc import ABC, abstractmethod
12-
from collections.abc import Iterator
13-
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
13+
from collections.abc import Iterator, Sequence
14+
from concurrent.futures import FIRST_EXCEPTION, Future, ThreadPoolExecutor, wait
1415
from enum import Enum
1516
from io import BytesIO
1617
from pathlib import Path
18+
from queue import Queue
1719
from typing import Any, BinaryIO, Callable, Optional, TypeVar
1820

1921
import boto3
@@ -35,7 +37,7 @@
3537
from cvat.apps.engine.log import ServerLogManager
3638
from cvat.apps.engine.models import CloudProviderChoice, CredentialsTypeChoice, DimensionType
3739
from cvat.apps.engine.rq import ExportRQMeta
38-
from cvat.apps.engine.utils import get_cpu_number, take_by
40+
from cvat.apps.engine.utils import get_cpu_number
3941
from cvat.utils.http import PROXIES_FOR_UNTRUSTED_URLS
4042
from utils.dataset_manifest.utils import InvalidPcdError, PcdReader
4143

@@ -214,9 +216,26 @@ def bulk_download_to_memory(
214216
func = object_downloader or self.download_fileobj
215217
threads_number = get_max_threads_number(len(files))
216218

219+
# We're using a custom queue to limit the maximum number of downloaded unprocessed
220+
# files stored in the memory.
221+
# For example, the builtin executor.map() could also be used here, but it
222+
# would enqueue all the file list in one go, and the downloaded files
223+
# would all be stored in memory until processed.
224+
queue: Queue[Future] = Queue(maxsize=threads_number)
225+
input_iter = iter(files)
217226
with ThreadPoolExecutor(max_workers=threads_number) as executor:
218-
for batch_links in take_by(files, chunk_size=threads_number):
219-
yield from executor.map(func, batch_links)
227+
while not queue.empty() or input_iter is not None:
228+
while not queue.full() and input_iter is not None:
229+
next_job_params = next(input_iter, None)
230+
if next_job_params is None:
231+
input_iter = None
232+
break
233+
234+
next_job = executor.submit(func, next_job_params)
235+
queue.put(next_job)
236+
237+
top_job = queue.get()
238+
yield top_job.result()
220239

221240
def bulk_download_to_dir(
222241
self,
@@ -354,26 +373,38 @@ def write_access(self):
354373

355374

356375
class HeaderFirstDownloader(ABC):
357-
def __init__(
358-
self,
359-
*,
360-
client: _CloudStorage,
361-
chunk_size: int = 65536,
362-
):
376+
def __init__(self, *, client: _CloudStorage):
363377
self.client = client
364-
self.chunk_size = chunk_size
365378

366379
@abstractmethod
367-
def try_parse_header(self, header: bytes, *, key: str) -> Any | None: ...
380+
def try_parse_header(self, header: NamedBytesIO) -> Any | None: ...
368381

369-
def log_header_miss(self, file: NamedBytesIO, *, key: str):
370-
full_object_size = len(file.getvalue())
382+
def log_header_miss(self, key: str, header_size: int, *, full_file: NamedBytesIO | None = None):
383+
message = (
384+
f'The first {header_size} bytes were not enough to parse the "{key}" object header. '
385+
)
386+
387+
if full_file:
388+
full_object_size = len(full_file.getvalue())
389+
390+
message += (
391+
f'Object size was {full_object_size} bytes. '
392+
f'Downloaded percentage was '
393+
f'{min(header_size, full_object_size) / full_object_size:.0%}'
394+
)
371395

372-
slogger.glob.warning(
373-
f'The {self.chunk_size} bytes were not enough to parse the "{key}" object header. '
374-
f'Object size was {full_object_size} bytes. '
375-
f'Downloaded percent was '
376-
f'{round(min(self.chunk_size, full_object_size) / full_object_size):%}'
396+
slogger.glob.warning(message)
397+
398+
def get_header_sizes_to_try(self) -> Sequence[int]:
399+
return (
400+
# The first 1-2Kb are typically enough for most formats with the static header size.
401+
# Unfortunately, it's not enough for some popular formats, such as jpeg,
402+
# which can optionally include a preview image embedded in the header, so we try
403+
# other bigger sizes, but less than the whole file.
404+
# For comparison, the standard Ethernet v2 MTU size is 1500 bytes.
405+
2048,
406+
16384,
407+
65536,
377408
)
378409

379410
def download(self, key: str) -> NamedBytesIO:
@@ -388,34 +419,54 @@ def download(self, key: str) -> NamedBytesIO:
388419
Returns:
389420
buffer with the image
390421
"""
391-
chunk = self.client.download_range_of_bytes(key, stop_byte=self.chunk_size - 1)
392422

393-
if self.try_parse_header(chunk, key=key):
394-
buff = NamedBytesIO(chunk)
395-
buff.filename = key
396-
else:
397-
buff = self.client.download_fileobj(key)
398-
self.log_header_miss(file=buff, key=key)
423+
buff = NamedBytesIO()
424+
buff.filename = key
399425

426+
headers_to_try = self.get_header_sizes_to_try()
427+
for i, header_size in enumerate(headers_to_try):
428+
buff.seek(0, io.SEEK_END)
429+
cur_pos = buff.tell()
430+
chunk = self.client.download_range_of_bytes(
431+
key, start_byte=cur_pos, stop_byte=header_size - 1
432+
)
433+
buff.write(chunk)
434+
buff.seek(0)
435+
436+
if self.try_parse_header(buff):
437+
buff.seek(0)
438+
return buff
439+
440+
if i + 1 < len(headers_to_try):
441+
self.log_header_miss(key=key, header_size=header_size)
442+
443+
buff = self.client.download_fileobj(key)
444+
self.log_header_miss(key=key, header_size=header_size, full_file=buff)
400445
return buff
401446

402447
class _HeaderFirstImageDownloader(HeaderFirstDownloader):
403-
def try_parse_header(self, header, *, key):
448+
def try_parse_header(self, header):
404449
image_parser = ImageFile.Parser()
405-
image_parser.feed(header)
450+
image_parser.feed(header.getvalue())
406451
return image_parser.image
407452

408-
def log_header_miss(self, file, *, key):
409-
full_object_size = len(file.getvalue())
410-
411-
slogger.glob.warning(
412-
f'The {self.chunk_size} bytes were not enough to parse the "{key}" object header. '
413-
f'Object size was {full_object_size} bytes. '
414-
f'Image resolution was {Image.open(file).size}. '
415-
f'Downloaded percent was '
416-
f'{round(min(self.chunk_size, full_object_size) / full_object_size):.0%}'
453+
def log_header_miss(self, key, header_size, *, full_file = None):
454+
message = (
455+
f'The first {header_size} bytes were not enough to parse the "{key}" object header. '
417456
)
418457

458+
if full_file:
459+
full_object_size = len(full_file.getvalue())
460+
461+
message += (
462+
f'Object size was {full_object_size} bytes. '
463+
f'Image resolution was {Image.open(full_file).size}. '
464+
f'Downloaded percentage was '
465+
f'{min(header_size, full_object_size) / full_object_size:.0%}'
466+
)
467+
468+
slogger.glob.warning(message)
469+
419470
def download(self, key):
420471
try:
421472
return super().download(key)
@@ -426,10 +477,10 @@ def download(self, key):
426477

427478

428479
class _HeaderFirstPcdDownloader(HeaderFirstDownloader):
429-
def try_parse_header(self, header, *, key):
480+
def try_parse_header(self, header):
430481
pcd_parser = PcdReader()
431-
file = NamedBytesIO(header)
432-
file_ext = os.path.splitext(key)[1].lower()
482+
file = header
483+
file_ext = os.path.splitext(file.filename)[1].lower()
433484

434485
if file_ext == ".bin":
435486
# We need to ensure the file is a valid .bin file
@@ -551,7 +602,16 @@ def __init__(self,
551602

552603
session = boto3.Session(**kwargs)
553604
self._s3 = session.resource("s3", endpoint_url=endpoint_url,
554-
config=Config(proxies=PROXIES_FOR_UNTRUSTED_URLS or {}),
605+
config=Config(
606+
proxies=PROXIES_FOR_UNTRUSTED_URLS or {},
607+
max_pool_connections=(
608+
# AWS can throttle the requests if there are too many of them,
609+
# the SDK handles it with the retry policy:
610+
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
611+
# 10 is the default value
612+
max(10, CPU_NUMBER * settings.CLOUD_DATA_DOWNLOADING_MAX_THREADS_NUMBER_PER_CPU)
613+
)
614+
),
555615
)
556616

557617
# anonymous access

0 commit comments

Comments
 (0)