diff --git a/libs/libapi/src/libapi/response.py b/libs/libapi/src/libapi/response.py index dce8dc512c..3b01d5f2d2 100644 --- a/libs/libapi/src/libapi/response.py +++ b/libs/libapi/src/libapi/response.py @@ -20,16 +20,11 @@ async def create_response( pa_table: pa.Table, offset: int, features: Features, - unsupported_columns: list[str], num_rows_total: int, partial: bool, use_row_idx_column: bool = False, truncated_columns: Optional[list[str]] = None, ) -> PaginatedResponse: - if set(pa_table.column_names).intersection(set(unsupported_columns)): - raise RuntimeError( - "The pyarrow table contains unsupported columns. They should have been ignored in the row group reader." - ) logging.debug(f"create response for {dataset=} {config=} {split=}") return { "features": [ @@ -46,7 +41,6 @@ async def create_response( storage_client=storage_client, offset=offset, features=features, - unsupported_columns=unsupported_columns, row_idx_column=ROW_IDX_COLUMN if use_row_idx_column else None, truncated_columns=truncated_columns, ), diff --git a/libs/libapi/src/libapi/utils.py b/libs/libapi/src/libapi/utils.py index 17fe6068c4..6b0c32ea26 100644 --- a/libs/libapi/src/libapi/utils.py +++ b/libs/libapi/src/libapi/utils.py @@ -205,15 +205,10 @@ async def to_rows_list( split: str, offset: int, features: Features, - unsupported_columns: list[str], storage_client: StorageClient, row_idx_column: Optional[str] = None, truncated_columns: Optional[list[str]] = None, ) -> list[RowItem]: - num_rows = pa_table.num_rows - for idx, (column, feature) in enumerate(features.items()): - if column in unsupported_columns: - pa_table = pa_table.add_column(idx, column, pa.array([None] * num_rows)) # transform the rows, if needed (e.g. save the images or audio to the assets, and return their URL) try: transformed_rows = await transform_rows( diff --git a/libs/libapi/tests/test_response.py b/libs/libapi/tests/test_response.py index 6de88b5215..eb0c5e9c50 100644 --- a/libs/libapi/tests/test_response.py +++ b/libs/libapi/tests/test_response.py @@ -42,7 +42,6 @@ async def test_create_response(storage_client: StorageClient) -> None: pa_table=ds.data, offset=0, features=ds.features, - unsupported_columns=[], num_rows_total=10, partial=False, ) @@ -67,7 +66,6 @@ async def test_create_response_with_row_idx_column(storage_client: StorageClient pa_table=ds.data, offset=0, features=ds.features, - unsupported_columns=[], num_rows_total=10, partial=False, use_row_idx_column=True, @@ -96,7 +94,6 @@ async def test_create_response_with_image(image_path: str, storage_client: Stora pa_table=ds_image.data, offset=0, features=ds_image.features, - unsupported_columns=[], num_rows_total=10, partial=False, ) @@ -137,7 +134,6 @@ async def test_create_response_with_document(document_path: str, storage_client: pa_table=ds_document.data, offset=0, features=ds_document.features, - unsupported_columns=[], num_rows_total=10, partial=False, ) diff --git a/libs/libcommon/src/libcommon/parquet_utils.py b/libs/libcommon/src/libcommon/parquet_utils.py index 4a64c98c6d..1db081dd26 100644 --- a/libs/libcommon/src/libcommon/parquet_utils.py +++ b/libs/libcommon/src/libcommon/parquet_utils.py @@ -22,7 +22,6 @@ from libcommon.prometheus import StepProfiler from libcommon.simple_cache import get_previous_step_or_raise from libcommon.storage import StrPath -from libcommon.viewer_utils.features import get_supported_unsupported_columns # For partial Parquet export we have paths like "en/partial-train/0000.parquet". # "-" is not allowed is split names so we use it in the prefix to avoid collisions. @@ -181,8 +180,6 @@ def read_size(self, columns: Optional[Iterable[str]] = None) -> int: @dataclass class ParquetIndexWithMetadata: features: Features - supported_columns: list[str] - unsupported_columns: list[str] parquet_files_urls: list[str] metadata_paths: list[str] num_bytes: list[int] @@ -329,10 +326,11 @@ def query_truncated_binary(self, offset: int, length: int) -> tuple[pa.Table, li ) # we use a minimum length to not end up with too empty cells try: pa_tables: list[pa.Table] = [] + columns = list(self.features.keys()) truncated_columns: set[str] = set() for i in range(first_row_group_id, last_row_group_id + 1): rg_pa_table, rg_truncated_columns = row_group_readers[i].read_truncated_binary( - self.supported_columns, max_binary_length=max_binary_length + columns, max_binary_length=max_binary_length ) pa_tables.append(rg_pa_table) truncated_columns |= set(rg_truncated_columns) @@ -438,12 +436,10 @@ def query(self, offset: int, length: int) -> pa.Table: ) with StepProfiler(method="parquet_index_with_metadata.query", step="read the row groups"): + columns = list(self.features.keys()) try: pa_table = pa.concat_tables( - [ - row_group_readers[i].read(self.supported_columns) - for i in range(first_row_group_id, last_row_group_id + 1) - ] + [row_group_readers[i].read(columns) for i in range(first_row_group_id, last_row_group_id + 1)] ) except ArrowInvalid as err: raise SchemaMismatchError("Parquet files have different schema.", err) @@ -486,15 +482,9 @@ def from_parquet_metadata_items( ): if features is None: # config-parquet version<6 didn't have features features = Features.from_arrow_schema(pq.read_schema(metadata_paths[0])) - # TODO(kszucs): since unsupported_features is always empty list we may omit the call below - supported_columns, unsupported_columns = get_supported_unsupported_columns( - features, - unsupported_features=[], - ) + return ParquetIndexWithMetadata( features=features, - supported_columns=supported_columns, - unsupported_columns=unsupported_columns, parquet_files_urls=parquet_files_urls, metadata_paths=metadata_paths, num_bytes=num_bytes, diff --git a/libs/libcommon/src/libcommon/viewer_utils/features.py b/libs/libcommon/src/libcommon/viewer_utils/features.py index df8bdd3172..394e1ecd91 100644 --- a/libs/libcommon/src/libcommon/viewer_utils/features.py +++ b/libs/libcommon/src/libcommon/viewer_utils/features.py @@ -29,7 +29,6 @@ Value, Video, ) -from datasets.features.features import FeatureType, _visit from PIL import Image as PILImage from libcommon.dtos import FeatureItem @@ -529,30 +528,3 @@ def to_features_list(features: Features) -> list[FeatureItem]: } for idx, name in enumerate(features) ] - - -def get_supported_unsupported_columns( - features: Features, - unsupported_features: list[FeatureType] = [], -) -> tuple[list[str], list[str]]: - supported_columns, unsupported_columns = [], [] - - for column, feature in features.items(): - str_column = str(column) - supported = True - - def classify(feature: FeatureType) -> None: - nonlocal supported - for unsupported_feature in unsupported_features: - if type(unsupported_feature) is type(feature) is Value: - if unsupported_feature.dtype == feature.dtype: - supported = False - elif type(unsupported_feature) is type(feature): - supported = False - - _visit(feature, classify) - if supported: - supported_columns.append(str_column) - else: - unsupported_columns.append(str_column) - return supported_columns, unsupported_columns diff --git a/libs/libcommon/tests/viewer_utils/test_features.py b/libs/libcommon/tests/viewer_utils/test_features.py index 9aa2bf1448..b351a32670 100644 --- a/libs/libcommon/tests/viewer_utils/test_features.py +++ b/libs/libcommon/tests/viewer_utils/test_features.py @@ -10,7 +10,6 @@ import boto3 import pytest from aiobotocore.response import StreamingBody -from datasets import Audio, Features, Image, List, Pdf, Value from moto import mock_s3 from urllib3._collections import HTTPHeaderDict @@ -19,7 +18,6 @@ from libcommon.url_preparator import URLPreparator from libcommon.viewer_utils.features import ( get_cell_value, - get_supported_unsupported_columns, infer_audio_file_extension, to_features_list, ) @@ -102,27 +100,6 @@ def test_to_features_list( assert first_feature["type"] == datasets_fixture.expected_feature_type -def test_get_supported_unsupported_columns() -> None: - features = Features( - { - "audio1": Audio(), - "audio2": Audio(sampling_rate=16_000), - "audio3": List(Audio()), - "image1": Image(), - "image2": Image(decode=False), - "image3": List(Image()), - "string": Value("string"), - "binary": Value("binary"), - "pdf": Pdf(), - "pdf2": [Pdf()], - } - ) - unsupported_features = [Value("binary"), Audio()] - supported_columns, unsupported_columns = get_supported_unsupported_columns(features, unsupported_features) - assert supported_columns == ["image1", "image2", "image3", "string", "pdf", "pdf2"] - assert unsupported_columns == ["audio1", "audio2", "audio3", "binary"] - - # specific test created for https://github.com/huggingface/dataset-viewer/issues/2045 # which is reproduced only when using s3 for fsspec def test_ogg_audio_with_s3( diff --git a/services/rows/src/rows/routes/rows.py b/services/rows/src/rows/routes/rows.py index b84f03b4a8..d4de382faa 100644 --- a/services/rows/src/rows/routes/rows.py +++ b/services/rows/src/rows/routes/rows.py @@ -115,7 +115,6 @@ async def rows_endpoint(request: Request) -> Response: pa_table=pa_table, offset=offset, features=rows_index.parquet_index.features, - unsupported_columns=rows_index.parquet_index.unsupported_columns, partial=rows_index.parquet_index.partial, num_rows_total=rows_index.parquet_index.num_rows_total, truncated_columns=truncated_columns, diff --git a/services/search/src/search/routes/filter.py b/services/search/src/search/routes/filter.py index 0102fe3bce..9c053338ed 100644 --- a/services/search/src/search/routes/filter.py +++ b/services/search/src/search/routes/filter.py @@ -33,7 +33,6 @@ from libcommon.prometheus import StepProfiler from libcommon.storage import StrPath, clean_dir from libcommon.storage_client import StorageClient -from libcommon.viewer_utils.features import get_supported_unsupported_columns from starlette.requests import Request from starlette.responses import Response @@ -147,15 +146,13 @@ async def filter_endpoint(request: Request) -> Response: # features must contain the row idx column for full_text_search features = Features.from_dict(content_parquet_metadata["features"]) features[ROW_IDX_COLUMN] = Value("int64") - with StepProfiler(method="filter_endpoint", step="get supported and unsupported columns"): - supported_columns, unsupported_columns = get_supported_unsupported_columns( - features, - ) + columns = list(features.keys()) + with StepProfiler(method="filter_endpoint", step="execute filter query"): num_rows_total, pa_table = await anyio.to_thread.run_sync( execute_filter_query, index_file_location, - supported_columns, + columns, where, orderby, length, @@ -180,7 +177,6 @@ async def filter_endpoint(request: Request) -> Response: pa_table=pa_table, offset=offset, features=features or Features.from_arrow_schema(pa_table.schema), - unsupported_columns=unsupported_columns, num_rows_total=num_rows_total, partial=partial, use_row_idx_column=True, diff --git a/services/search/src/search/routes/search.py b/services/search/src/search/routes/search.py index 6b8b4f47d3..079a4fc535 100644 --- a/services/search/src/search/routes/search.py +++ b/services/search/src/search/routes/search.py @@ -35,10 +35,7 @@ from libcommon.prometheus import StepProfiler from libcommon.storage import StrPath, clean_dir from libcommon.storage_client import StorageClient -from libcommon.viewer_utils.features import ( - get_supported_unsupported_columns, - to_features_list, -) +from libcommon.viewer_utils.features import to_features_list from starlette.requests import Request from starlette.responses import Response @@ -86,14 +83,11 @@ async def create_response( storage_client: StorageClient, offset: int, features: Features, - unsupported_columns: list[str], num_rows_total: int, partial: bool, ) -> PaginatedResponse: features_without_key = features.copy() features_without_key.pop(ROW_IDX_COLUMN, None) - if len(pa_table) > 0: - pa_table = pa_table.drop(unsupported_columns) logging.info(f"create response for {dataset=} {config=} {split=}") return PaginatedResponse( @@ -107,7 +101,6 @@ async def create_response( storage_client=storage_client, offset=offset, features=features, - unsupported_columns=unsupported_columns, row_idx_column=ROW_IDX_COLUMN, ), num_rows_total=num_rows_total, @@ -202,16 +195,14 @@ async def search_endpoint(request: Request) -> Response: # features must contain the row idx column for full_text_search features = Features.from_dict(content_parquet_metadata["features"]) features[ROW_IDX_COLUMN] = Value("int64") - with StepProfiler(method="search_endpoint", step="get supported and unsupported columns"): - supported_columns, unsupported_columns = get_supported_unsupported_columns( - features, - ) + columns = list(features.keys()) + with StepProfiler(method="search_endpoint", step="perform FTS command"): logging.debug(f"connect to index file {index_file_location}") num_rows_total, pa_table = await anyio.to_thread.run_sync( full_text_search, index_file_location, - supported_columns, + columns, query, offset, length, @@ -235,7 +226,6 @@ async def search_endpoint(request: Request) -> Response: storage_client=cached_assets_storage_client, offset=offset, features=features or Features.from_arrow_schema(pa_table.schema), - unsupported_columns=unsupported_columns, num_rows_total=num_rows_total, partial=partial, ) diff --git a/services/search/tests/routes/test_filter.py b/services/search/tests/routes/test_filter.py index a0eb678909..c7707f89d0 100644 --- a/services/search/tests/routes/test_filter.py +++ b/services/search/tests/routes/test_filter.py @@ -153,7 +153,6 @@ async def test_create_response(ds: Dataset, app_config: AppConfig, storage_clien pa_table=pa_table, offset=0, features=ds.features, - unsupported_columns=[], num_rows_total=4, partial=False, use_row_idx_column=True, diff --git a/services/worker/tests/job_runners/config/test_parquet_metadata.py b/services/worker/tests/job_runners/config/test_parquet_metadata.py index c810dcf80c..d119753fde 100644 --- a/services/worker/tests/job_runners/config/test_parquet_metadata.py +++ b/services/worker/tests/job_runners/config/test_parquet_metadata.py @@ -388,8 +388,6 @@ def test_ParquetIndexWithMetadata_query( pf.metadata.write_metadata_file(metadata_path) index = ParquetIndexWithMetadata( features=features, - supported_columns=list(features), - unsupported_columns=[], parquet_files_urls=[url], metadata_paths=[metadata_path], num_rows=[num_rows],