Skip to content

feat: ROOT-11: Support reading JSONL from source cloud storages #7555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 67 commits into
base: develop
Choose a base branch
from

Conversation

matt-bernstein
Copy link
Contributor

@matt-bernstein matt-bernstein commented May 16, 2025

[x] Factor out JSON parsing and validation logic from import storages into a function that can be hot-swapped in LSO/LSE
[x] add it to settings as an app-specific variable
[x] replace stdlib parsing with pyarrow parsing to handle JSONL as well as JSON
[x] test coverage for JSONL
[x] feature flag
[x] unskip multitask import tests in LSO, to prepare for testing different behavior in LSE and LSO

Implementation notes:

  • had to move computation of row_index and row_group into get_data so that load_tasks_json, which will be swapped in LSO/LSE, will have the correct type in all cases - makes for a pretty large diff
  • simplified error messages significantly since the relevant logic is shared/deduped
  • had to stick with stdlib json parsing instead of removing it entirely:
    • in the case of single dict, pyarrow can't distinguish between this and a one-line JSONL file, so need stdlib json to set row_index=None
    • in the case of top-level list, pyarrow can't parse this at all since it assumes each line is an entry in a table. There is an open issue for this.
    • consider peeking at file extensions to avoid double-parsing (for clarity, not speed) - see TODO comment

matt-bernstein and others added 25 commits May 14, 2025 12:15
…k to task within file of tasks on cloud storage
Co-authored-by: Jo Booth <[email protected]>
Co-authored-by: Jo Booth <[email protected]>
@matt-bernstein matt-bernstein requested a review from a team as a code owner May 16, 2025 17:23
Copy link

netlify bot commented May 16, 2025

Deploy Preview for label-studio-docs-new-theme canceled.

Name Link
🔨 Latest commit 1e82dd6
🔍 Latest deploy log https://app.netlify.com/projects/label-studio-docs-new-theme/deploys/682e426c52d79f0008aa6095

Copy link

netlify bot commented May 16, 2025

Deploy Preview for label-studio-storybook canceled.

Name Link
🔨 Latest commit 1e82dd6
🔍 Latest deploy log https://app.netlify.com/projects/label-studio-storybook/deploys/682e426c693fe4000848ba03

@github-actions github-actions bot added the feat label May 16, 2025
Copy link

netlify bot commented May 16, 2025

Deploy Preview for heartex-docs canceled.

Name Link
🔨 Latest commit 1e82dd6
🔍 Latest deploy log https://app.netlify.com/projects/heartex-docs/deploys/682e426c5f22180008b6e49d

@jombooth
Copy link
Contributor

jombooth commented May 20, 2025

/fm sync

Workflow run

@matt-bernstein
Copy link
Contributor Author

matt-bernstein commented May 20, 2025

/fm sync

Workflow run

@@ -598,6 +598,7 @@
MEMBER_PERM = 'core.api_permissions.MemberHasOwnerPermission'
RECALCULATE_ALL_STATS = None
GET_STORAGE_LIST = 'io_storages.functions.get_storage_list'
STORAGE_LOAD_TASKS_JSON = 'io_storages.utils._load_tasks_json'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the meaning of a function or method that starts with a _ is "this should not be used outside of this class/module/whatever" - seems to me that we are treating this function as more of a public one, since it's being referred to in this other file. Would it be a pain to rename?



@dataclass
class StorageObjectParams:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think StorageObject would be a better name for this, since it actually contains the task now


@classmethod
def bulk_create(
cls, task_datas: list[dict], key, row_idxs: list[int] | None = None, row_groups: list[int] | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer row_indexes - generally in favor of avoiding abbreviations in variable names unless they're truly ubiquitous.

@matt-bernstein
Copy link
Contributor Author

Note for posterity: pa.Table.from_pylist has a footgun in that it only constructs the schema from the first row; use from_json with BytesIO next time since it constructs the schema from all rows

Co-authored-by: Jo Booth <[email protected]>
@@ -49,6 +49,7 @@ dependencies = [
"ordered-set (==4.0.2)",
"pandas (>=2.2.3)",
"psycopg2-binary (==2.9.10)",
"pyarrow (>=18.0.0,<19.0.0)",
Copy link
Contributor

@jombooth jombooth May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"pyarrow (>=18.0.0,<19.0.0)",
"pyarrow (>=18.1.0,<19.0.0)",

Would recommend we don't allow people to go older than the version in poetry.lock, note this will require a relock

_error_wrapper()


def load_tasks_json(blob_str: str, key: str) -> tuple[list[dict], list[StorageObjectParams]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def load_tasks_json(blob_str: str, key: str) -> tuple[list[dict], list[StorageObjectParams]]:
def load_tasks_json(blob_str: str, key: str) -> list[StorageObjectParams]:

@@ -515,6 +523,7 @@ def sync(self):
self.info_set_queued()
import_sync_background(self.__class__, self.id)
except Exception:
logger.debug(f'Storage {self} failed', exc_info=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug(f'Storage {self} failed', exc_info=True)
# needed to facilitate debugging storage-related testcases, since otherwise no exception is logged
logger.debug(f'Storage {self} failed', exc_info=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rare case where I'm in favor of a comment - the reason it's here is fairly subtle and not at all obvious from the context

@@ -431,7 +438,8 @@ def _scan_and_create_links(self, link_class):

logger.debug(f'{self}: found new key {key}')
try:
tasks_data = self.get_data(key)
# list of (task data + ImportStorageLink details)
links_params = self.get_data(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the change to StorageObject, this can be storage_objects, which I think will be a lot more readable than links_params vs link_params (although, valiant effort at making the plurals situation readable enough!)

@@ -431,7 +438,8 @@ def _scan_and_create_links(self, link_class):

logger.debug(f'{self}: found new key {key}')
try:
tasks_data = self.get_data(key)
# list of (task data + ImportStorageLink details)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# list of (task data + ImportStorageLink details)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants