Skip to content

feat: ROOT-11: Support reading JSONL from source cloud storages #7555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 90 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
3d23bf8
feat: ROOT-1: Add offset to storage link model to support storage lin…
matt-bernstein May 14, 2025
9e2a7bb
review comments
matt-bernstein May 14, 2025
2cbcb41
remove whitespace
matt-bernstein May 14, 2025
3de9ffb
update comment
matt-bernstein May 14, 2025
dc423d0
Sync Follow Merge dependencies
matt-bernstein May 14, 2025
ad6a221
Merge branch 'develop' into 'fb-ROOT-1'
matt-bernstein May 14, 2025
411940c
feat: ROOT-9: Allow reading multiple tasks from a JSON file in source…
matt-bernstein May 14, 2025
f37b8af
update comment
matt-bernstein May 14, 2025
cfbb076
update storagelink creation
matt-bernstein May 14, 2025
d3c02d9
fix DM action
matt-bernstein May 14, 2025
1033d26
blue
matt-bernstein May 14, 2025
c18d792
Merge remote-tracking branch 'origin/develop' into fb-ROOT-9
matt-bernstein May 14, 2025
4e75268
udpate test
matt-bernstein May 14, 2025
b629817
futureproof create method
matt-bernstein May 14, 2025
8f9d6e8
Sync Follow Merge dependencies
matt-bernstein May 14, 2025
b0c25a7
Merge branch 'develop' into 'fb-ROOT-9'
matt-bernstein May 15, 2025
5714e98
Sync Follow Merge dependencies
matt-bernstein May 15, 2025
027ad73
Merge remote-tracking branch 'origin/develop' into fb-ROOT-9
matt-bernstein May 15, 2025
85e7b0a
Sync Follow Merge dependencies
matt-bernstein May 15, 2025
f6841e3
make row_index=none default
matt-bernstein May 15, 2025
810ba62
get_data return type
matt-bernstein May 15, 2025
8b41a3d
blue
matt-bernstein May 15, 2025
3ae1230
test new model fields
matt-bernstein May 15, 2025
7cb8e9c
feat: ROOT-11: Support reading JSONL from source cloud storages
matt-bernstein May 16, 2025
36e181e
split out repeated json parsing logic from import storages
matt-bernstein May 16, 2025
015a2f7
Apply pre-commit linters
robot-ci-heartex May 16, 2025
aa51131
add to settings
matt-bernstein May 16, 2025
8c45515
comment
matt-bernstein May 16, 2025
6948285
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 16, 2025
fbe800f
add pyarrow lib
matt-bernstein May 16, 2025
e89639d
cleanup GCS utils
matt-bernstein May 16, 2025
364ccfb
Sync Follow Merge dependencies
matt-bernstein May 16, 2025
e0364b4
Merge branch 'develop' into 'fb-ROOT-11'
matt-bernstein May 16, 2025
2dcbca3
fix gcs
matt-bernstein May 16, 2025
c45fbfd
organize import
matt-bernstein May 19, 2025
515228e
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 19, 2025
64ac978
handle localfiles
matt-bernstein May 19, 2025
fecf18e
remove unused TaskValidationError
matt-bernstein May 19, 2025
f219f21
add jsonl; compute row_idx and row_group in get_data instead of after…
matt-bernstein May 19, 2025
606fa1f
don't pass storage class name
matt-bernstein May 19, 2025
e62cbc6
more permissive parsing
matt-bernstein May 19, 2025
1d4cc49
fix recursion
matt-bernstein May 19, 2025
bb42c08
structured return type for get_data
matt-bernstein May 19, 2025
2d9bafe
ruff
matt-bernstein May 19, 2025
b79b886
string fmt
matt-bernstein May 19, 2025
5cb81f6
string fmt
matt-bernstein May 19, 2025
93104db
remove comment
matt-bernstein May 19, 2025
773413d
remove option
matt-bernstein May 19, 2025
f604b81
bugfix
matt-bernstein May 20, 2025
f4c7d38
change structured type, add tests
matt-bernstein May 20, 2025
92e14fc
wip fix ff mocking
matt-bernstein May 20, 2025
384bc3f
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 20, 2025
065b6a3
wip convert to pytest
matt-bernstein May 20, 2025
561c446
regen lockfile
jombooth May 20, 2025
2000629
fix tests
matt-bernstein May 20, 2025
78fbdd4
place django_db mark at file level
jombooth May 20, 2025
882c08a
handle None
matt-bernstein May 20, 2025
cf0c0d3
wip mock ffs
matt-bernstein May 21, 2025
bae6ae1
ensure blob_str enters as bytes
matt-bernstein May 21, 2025
9001423
add debug log, fix azure test
matt-bernstein May 21, 2025
10e9725
fix imputed field in task from other tasks in file
matt-bernstein May 21, 2025
39e4178
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 21, 2025
2109679
fix redis
matt-bernstein May 21, 2025
3f065f5
fix mixed task formats and tests
matt-bernstein May 21, 2025
2806f0c
Apply pre-commit linters
robot-ci-heartex May 21, 2025
2aef19e
fix docstring
matt-bernstein May 21, 2025
1e82dd6
rename symbols
matt-bernstein May 21, 2025
c15d15e
add comment
matt-bernstein May 21, 2025
d58650b
remove comment
matt-bernstein May 21, 2025
a50ffe9
update min version
matt-bernstein May 21, 2025
9bd9639
rename
matt-bernstein May 21, 2025
b1aae02
defensive fix
matt-bernstein May 21, 2025
bd189df
Apply pre-commit linters
robot-ci-heartex May 21, 2025
2a3c07f
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 22, 2025
933b27e
lockfile
matt-bernstein May 22, 2025
0cb56ec
Sync Follow Merge dependencies
matt-bernstein May 22, 2025
50726af
Merge branch 'develop' into 'fb-ROOT-11'
matt-bernstein May 22, 2025
885da86
Sync Follow Merge dependencies
matt-bernstein May 22, 2025
966ba88
Merge branch 'develop' into 'fb-ROOT-11'
matt-bernstein May 22, 2025
6320d8a
add some debugging prints
jombooth May 22, 2025
8f9dee8
more logging, add close call
jombooth May 22, 2025
b2484af
more logging
jombooth May 22, 2025
f2665f9
Revert "more logging"
jombooth May 22, 2025
6a5b397
Revert "more logging, add close call"
jombooth May 22, 2025
a8a1c22
Revert "add some debugging prints"
jombooth May 22, 2025
3d3c8e4
manually toggle logging FF
matt-bernstein May 22, 2025
80b3492
conditionally skip test
matt-bernstein May 22, 2025
77c3f27
Revert "manually toggle logging FF"
matt-bernstein May 23, 2025
dff2f12
Merge remote-tracking branch 'origin/develop' into fb-ROOT-11
matt-bernstein May 23, 2025
046f897
Sync Follow Merge dependencies
matt-bernstein May 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions label_studio/core/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@
MEMBER_PERM = 'core.api_permissions.MemberHasOwnerPermission'
RECALCULATE_ALL_STATS = None
GET_STORAGE_LIST = 'io_storages.functions.get_storage_list'
STORAGE_LOAD_TASKS_JSON = 'io_storages.utils.load_tasks_json_lso'
STORAGE_ANNOTATION_SERIALIZER = 'io_storages.serializers.StorageAnnotationSerializer'
TASK_SERIALIZER_BULK = 'tasks.serializers.BaseTaskSerializerBulk'
PREPROCESS_FIELD_NAME = 'data_manager.functions.preprocess_field_name'
Expand Down
29 changes: 11 additions & 18 deletions label_studio/io_storages/azure_blob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
ImportStorageLink,
ProjectStorageMixin,
)
from io_storages.utils import parse_range, storage_can_resolve_bucket_url
from io_storages.utils import (
StorageObject,
load_tasks_json,
parse_range,
storage_can_resolve_bucket_url,
)
from tasks.models import Annotation

from label_studio.io_storages.azure_blob.utils import AZURE
Expand Down Expand Up @@ -209,28 +214,16 @@ def iterkeys(self):
continue
yield file.name

def get_data(self, key) -> Union[dict, list[dict]]:
def get_data(self, key) -> list[StorageObject]:
if self.use_blob_urls:
data_key = settings.DATA_UNDEFINED_NAME
return {data_key: f'{self.url_scheme}://{self.container}/{key}'}
task = {data_key: f'{self.url_scheme}://{self.container}/{key}'}
return [StorageObject(key=key, task_data=task)]

container = self.get_container()
blob = container.download_blob(key)
blob_str = blob.content_as_text()
value = json.loads(blob_str)
if isinstance(value, dict):
return value
elif isinstance(value, list):
for idx, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(
f'Error on key {key} item {idx}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
return value
else:
raise ValueError(
f'Error on key {key}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
blob_str = blob.content_as_bytes()
return load_tasks_json(blob_str, key)

def scan_and_create_links(self):
return self._scan_and_create_links(AzureBlobImportStorageLink)
Expand Down
45 changes: 27 additions & 18 deletions label_studio/io_storages/base_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import traceback as tb
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict
from datetime import datetime
from typing import Union
from urllib.parse import urljoin
Expand All @@ -27,7 +28,7 @@
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_rq import job
from io_storages.utils import get_uri_via_regex, parse_bucket_uri
from io_storages.utils import StorageObject, get_uri_via_regex, parse_bucket_uri
from rq.job import Job
from tasks.models import Annotation, Task
from tasks.serializers import AnnotationSerializer, PredictionSerializer
Expand Down Expand Up @@ -230,7 +231,7 @@
def iterkeys(self):
return iter(())

def get_data(self, key) -> list[dict]:
def get_data(self, key) -> list[StorageObject]:
raise NotImplementedError

def generate_http_url(self, url):
Expand Down Expand Up @@ -341,17 +342,20 @@
raise NotImplementedError

@classmethod
def add_task(cls, data, project, maximum_annotations, max_inner_id, storage, key, row_index, link_class):
def add_task(cls, project, maximum_annotations, max_inner_id, storage, link_object: StorageObject, link_class):
link_kwargs = asdict(link_object)
data = link_kwargs.pop('task_data', None)

# predictions
predictions = data.get('predictions', [])
predictions = data.get('predictions') or []
if predictions:
if 'data' not in data:
raise ValueError(
'If you use "predictions" field in the task, ' 'you must put "data" field in the task too'
)

# annotations
annotations = data.get('annotations', [])
annotations = data.get('annotations') or []
cancelled_annotations = 0
if annotations:
if 'data' not in data:
Expand All @@ -361,7 +365,10 @@
cancelled_annotations = len([a for a in annotations if a.get('was_cancelled', False)])

if 'data' in data and isinstance(data['data'], dict):
data = data['data']
if data['data'] is not None:
data = data['data']
else:
data.pop('data')

Check warning on line 371 in label_studio/io_storages/base_models.py

View check run for this annotation

Codecov / codecov/patch

label_studio/io_storages/base_models.py#L371

Added line #L371 was not covered by tests

with transaction.atomic():
task = Task.objects.create(
Expand All @@ -375,8 +382,8 @@
inner_id=max_inner_id,
)

link_class.create(task, key, storage, row_index=row_index)
logger.debug(f'Create {storage.__class__.__name__} link with {key=} and {row_index=} for {task=}')
link_class.create(task, storage=storage, **link_kwargs)
logger.debug(f'Create {storage.__class__.__name__} link with {link_kwargs} for {task=}')

raise_exception = not flag_set(
'ff_fix_back_dev_3342_storage_scan_with_invalid_annotations', user=AnonymousUser()
Expand Down Expand Up @@ -431,7 +438,7 @@

logger.debug(f'{self}: found new key {key}')
try:
tasks_data = self.get_data(key)
link_objects = self.get_data(key)
except (UnicodeDecodeError, json.decoder.JSONDecodeError) as exc:
logger.debug(exc, exc_info=True)
raise ValueError(
Expand All @@ -440,19 +447,19 @@
f'"Treat every bucket object as a source file"'
)

if isinstance(tasks_data, dict):
tasks_data = [tasks_data]
row_indices = [None]
else:
if not flag_set('fflag_feat_dia_2092_multitasks_per_storage_link'):
tasks_data = tasks_data[:1]
row_indices = range(len(tasks_data))
if not flag_set('fflag_feat_dia_2092_multitasks_per_storage_link'):
link_objects = link_objects[:1]

Check warning on line 451 in label_studio/io_storages/base_models.py

View check run for this annotation

Codecov / codecov/patch

label_studio/io_storages/base_models.py#L451

Added line #L451 was not covered by tests

for row_index, task_data in zip(row_indices, tasks_data):
for link_object in link_objects:
# TODO: batch this loop body with add_task -> add_tasks in a single bulk write.
# See DIA-2062 for prerequisites
task = self.add_task(
task_data, self.project, maximum_annotations, max_inner_id, self, key, row_index, link_class
self.project,
maximum_annotations,
max_inner_id,
self,
link_object,
link_class=link_class,
)
max_inner_id += 1

Expand Down Expand Up @@ -515,6 +522,8 @@
self.info_set_queued()
import_sync_background(self.__class__, self.id)
except Exception:
# needed to facilitate debugging storage-related testcases, since otherwise no exception is logged
logger.debug(f'Storage {self} failed', exc_info=True)
storage_background_failure(self)

class Meta:
Expand Down
29 changes: 11 additions & 18 deletions label_studio/io_storages/gcs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
ProjectStorageMixin,
)
from io_storages.gcs.utils import GCS
from io_storages.utils import parse_range, storage_can_resolve_bucket_url
from io_storages.utils import (
StorageObject,
load_tasks_json,
parse_range,
storage_can_resolve_bucket_url,
)
from tasks.models import Annotation

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -180,28 +185,16 @@ def iterkeys(self):
return_key=True,
)

def get_data(self, key) -> Union[dict, list[dict]]:
def get_data(self, key) -> list[StorageObject]:
if self.use_blob_urls:
return {settings.DATA_UNDEFINED_NAME: GCS.get_uri(self.bucket, key)}
data = GCS.read_file(
task = {settings.DATA_UNDEFINED_NAME: GCS.get_uri(self.bucket, key)}
return [StorageObject(key=key, task_data=task)]
blob_str = GCS.read_file(
client=self.get_client(),
bucket_name=self.bucket,
key=key,
convert_to=GCS.ConvertBlobTo.JSON,
)
if isinstance(data, dict):
return data
elif isinstance(data, list):
for idx, item in enumerate(data):
if not isinstance(item, dict):
raise ValueError(
f'Error on key {key} item {idx}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
return data
else:
raise ValueError(
f'Error on key {key}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
return load_tasks_json(blob_str, key)

def generate_http_url(self, url):
return GCS.generate_http_url(
Expand Down
23 changes: 2 additions & 21 deletions label_studio/io_storages/gcs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,34 +250,15 @@ def iter_images_filename(cls, client, bucket_name, max_files):
def get_uri(cls, bucket_name, key):
return f'gs://{bucket_name}/{key}'

@classmethod
def _try_read_json(cls, blob_str):
try:
data = json.loads(blob_str)
except ValueError:
logger.error(f"Can't parse JSON from {blob_str}")
return
return data

@classmethod
def read_file(
cls, client: gcs.Client, bucket_name: str, key: str, convert_to: ConvertBlobTo = ConvertBlobTo.NOTHING
):
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(key)
blob_str = blob.download_as_bytes()
if convert_to == cls.ConvertBlobTo.NOTHING:
return blob_str
elif convert_to == cls.ConvertBlobTo.JSON:
return cls._try_read_json(blob_str)
elif convert_to == cls.ConvertBlobTo.JSON_DICT:
json_data = cls._try_read_json(blob_str)
if not isinstance(json_data, dict):
raise ValueError(
f'Error on key {key}: For {cls.__name__} your JSON file must be a dictionary with one task.'
)
return json_data
elif convert_to == cls.ConvertBlobTo.BASE64:

if convert_to == cls.ConvertBlobTo.BASE64:
return base64.b64encode(blob_str)

return blob_str
Expand Down
32 changes: 9 additions & 23 deletions label_studio/io_storages/localfiles/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ImportStorageLink,
ProjectStorageMixin,
)
from io_storages.utils import StorageObject, load_tasks_json
from rest_framework.exceptions import ValidationError
from tasks.models import Annotation

Expand Down Expand Up @@ -78,39 +79,24 @@
continue
yield str(file)

def get_data(self, key) -> dict | list[dict]:
def get_data(self, key) -> list[StorageObject]:
path = Path(key)
if self.use_blob_urls:
# include self-hosted links pointed to local resources via
# {settings.HOSTNAME}/data/local-files?d=<path/to/local/dir>
document_root = Path(settings.LOCAL_FILES_DOCUMENT_ROOT)
relative_path = str(path.relative_to(document_root))
return {
task = {
settings.DATA_UNDEFINED_NAME: f'{settings.HOSTNAME}/data/local-files/?d={quote(str(relative_path))}'
}
return [StorageObject(key=key, task_data=task)]

try:
with open(path, encoding='utf8') as f:
value = json.load(f)
except (UnicodeDecodeError, json.decoder.JSONDecodeError):
raise ValueError(
f"Can't import JSON-formatted tasks from {key}. If you're trying to import binary objects, "
f'perhaps you\'ve forgot to enable "Treat every bucket object as a source file" option?'
)

if isinstance(value, dict):
return value
elif isinstance(value, list):
for idx, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(
f'Error on key {key} item {idx}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
return value
else:
raise ValueError(
f'Error on key {key}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
with open(path, 'rb') as f:
blob_str = f.read()
return load_tasks_json(blob_str, key)
except OSError as e:
raise ValueError(f'Failed to read file {path}: {str(e)}')

Check warning on line 99 in label_studio/io_storages/localfiles/models.py

View check run for this annotation

Codecov / codecov/patch

label_studio/io_storages/localfiles/models.py#L95-L99

Added lines #L95 - L99 were not covered by tests

def scan_and_create_links(self):
return self._scan_and_create_links(LocalFilesImportStorageLink)
Expand Down
26 changes: 3 additions & 23 deletions label_studio/io_storages/redis/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import json
import logging
from typing import Union

import redis
from django.db import models
Expand All @@ -17,6 +16,7 @@
ImportStorageLink,
ProjectStorageMixin,
)
from io_storages.utils import StorageObject, load_tasks_json
from tasks.models import Annotation

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,32 +90,12 @@ def iterkeys(self):
for key in client.keys(path + '*'):
yield key

def get_data(self, key) -> Union[dict, list[dict]]:
def get_data(self, key) -> list[StorageObject]:
client = self.get_client()
value_str = client.get(key)
if not value_str:
return []
try:
value = json.loads(value_str)
# NOTE: this validation did not previously exist, we were accepting any JSON values
if isinstance(value, dict):
return value
elif isinstance(value, list):
for idx, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(
f'Error on key {key} item {idx}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
return value
else:
raise ValueError(
f'Error on key {key}: For {self.__class__.__name__} your JSON file must be a dictionary with one task, or a list of dictionaries with one task each'
)
except json.decoder.JSONDecodeError:
raise ValueError(
f"Can't import JSON-formatted tasks from {key}. If you're trying to import binary objects, "
f'perhaps you\'ve forgot to enable "Treat every bucket object as a source file" option?'
)
return load_tasks_json(value_str, key)

def scan_and_create_links(self):
return self._scan_and_create_links(RedisImportStorageLink)
Expand Down
Loading
Loading