Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions libs/libapi/src/libapi/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ async def create_response(
pa_table: pa.Table,
offset: int,
features: Features,
unsupported_columns: list[str],
num_rows_total: int,
partial: bool,
use_row_idx_column: bool = False,
truncated_columns: Optional[list[str]] = None,
) -> PaginatedResponse:
if set(pa_table.column_names).intersection(set(unsupported_columns)):
raise RuntimeError(
"The pyarrow table contains unsupported columns. They should have been ignored in the row group reader."
)
logging.debug(f"create response for {dataset=} {config=} {split=}")
return {
"features": [
Expand All @@ -46,7 +41,6 @@ async def create_response(
storage_client=storage_client,
offset=offset,
features=features,
unsupported_columns=unsupported_columns,
row_idx_column=ROW_IDX_COLUMN if use_row_idx_column else None,
truncated_columns=truncated_columns,
),
Expand Down
5 changes: 0 additions & 5 deletions libs/libapi/src/libapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,10 @@ async def to_rows_list(
split: str,
offset: int,
features: Features,
unsupported_columns: list[str],
storage_client: StorageClient,
row_idx_column: Optional[str] = None,
truncated_columns: Optional[list[str]] = None,
) -> list[RowItem]:
num_rows = pa_table.num_rows
for idx, (column, feature) in enumerate(features.items()):
if column in unsupported_columns:
pa_table = pa_table.add_column(idx, column, pa.array([None] * num_rows))
# transform the rows, if needed (e.g. save the images or audio to the assets, and return their URL)
try:
transformed_rows = await transform_rows(
Expand Down
4 changes: 0 additions & 4 deletions libs/libapi/tests/test_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async def test_create_response(storage_client: StorageClient) -> None:
pa_table=ds.data,
offset=0,
features=ds.features,
unsupported_columns=[],
num_rows_total=10,
partial=False,
)
Expand All @@ -67,7 +66,6 @@ async def test_create_response_with_row_idx_column(storage_client: StorageClient
pa_table=ds.data,
offset=0,
features=ds.features,
unsupported_columns=[],
num_rows_total=10,
partial=False,
use_row_idx_column=True,
Expand Down Expand Up @@ -96,7 +94,6 @@ async def test_create_response_with_image(image_path: str, storage_client: Stora
pa_table=ds_image.data,
offset=0,
features=ds_image.features,
unsupported_columns=[],
num_rows_total=10,
partial=False,
)
Expand Down Expand Up @@ -137,7 +134,6 @@ async def test_create_response_with_document(document_path: str, storage_client:
pa_table=ds_document.data,
offset=0,
features=ds_document.features,
unsupported_columns=[],
num_rows_total=10,
partial=False,
)
Expand Down
20 changes: 5 additions & 15 deletions libs/libcommon/src/libcommon/parquet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from libcommon.prometheus import StepProfiler
from libcommon.simple_cache import get_previous_step_or_raise
from libcommon.storage import StrPath
from libcommon.viewer_utils.features import get_supported_unsupported_columns

# For partial Parquet export we have paths like "en/partial-train/0000.parquet".
# "-" is not allowed is split names so we use it in the prefix to avoid collisions.
Expand Down Expand Up @@ -181,8 +180,6 @@ def read_size(self, columns: Optional[Iterable[str]] = None) -> int:
@dataclass
class ParquetIndexWithMetadata:
features: Features
supported_columns: list[str]
unsupported_columns: list[str]
parquet_files_urls: list[str]
metadata_paths: list[str]
num_bytes: list[int]
Expand Down Expand Up @@ -329,10 +326,11 @@ def query_truncated_binary(self, offset: int, length: int) -> tuple[pa.Table, li
) # we use a minimum length to not end up with too empty cells
try:
pa_tables: list[pa.Table] = []
columns = list(self.features.keys())
truncated_columns: set[str] = set()
for i in range(first_row_group_id, last_row_group_id + 1):
rg_pa_table, rg_truncated_columns = row_group_readers[i].read_truncated_binary(
self.supported_columns, max_binary_length=max_binary_length
columns, max_binary_length=max_binary_length
)
pa_tables.append(rg_pa_table)
truncated_columns |= set(rg_truncated_columns)
Expand Down Expand Up @@ -438,12 +436,10 @@ def query(self, offset: int, length: int) -> pa.Table:
)

with StepProfiler(method="parquet_index_with_metadata.query", step="read the row groups"):
columns = list(self.features.keys())
try:
pa_table = pa.concat_tables(
[
row_group_readers[i].read(self.supported_columns)
for i in range(first_row_group_id, last_row_group_id + 1)
]
[row_group_readers[i].read(columns) for i in range(first_row_group_id, last_row_group_id + 1)]
)
except ArrowInvalid as err:
raise SchemaMismatchError("Parquet files have different schema.", err)
Expand Down Expand Up @@ -486,15 +482,9 @@ def from_parquet_metadata_items(
):
if features is None: # config-parquet version<6 didn't have features
features = Features.from_arrow_schema(pq.read_schema(metadata_paths[0]))
# TODO(kszucs): since unsupported_features is always empty list we may omit the call below
supported_columns, unsupported_columns = get_supported_unsupported_columns(
features,
unsupported_features=[],
)

return ParquetIndexWithMetadata(
features=features,
supported_columns=supported_columns,
unsupported_columns=unsupported_columns,
parquet_files_urls=parquet_files_urls,
metadata_paths=metadata_paths,
num_bytes=num_bytes,
Expand Down
28 changes: 0 additions & 28 deletions libs/libcommon/src/libcommon/viewer_utils/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
Value,
Video,
)
from datasets.features.features import FeatureType, _visit
from PIL import Image as PILImage

from libcommon.dtos import FeatureItem
Expand Down Expand Up @@ -529,30 +528,3 @@ def to_features_list(features: Features) -> list[FeatureItem]:
}
for idx, name in enumerate(features)
]


def get_supported_unsupported_columns(
features: Features,
unsupported_features: list[FeatureType] = [],
) -> tuple[list[str], list[str]]:
supported_columns, unsupported_columns = [], []

for column, feature in features.items():
str_column = str(column)
supported = True

def classify(feature: FeatureType) -> None:
nonlocal supported
for unsupported_feature in unsupported_features:
if type(unsupported_feature) is type(feature) is Value:
if unsupported_feature.dtype == feature.dtype:
supported = False
elif type(unsupported_feature) is type(feature):
supported = False

_visit(feature, classify)
if supported:
supported_columns.append(str_column)
else:
unsupported_columns.append(str_column)
return supported_columns, unsupported_columns
23 changes: 0 additions & 23 deletions libs/libcommon/tests/viewer_utils/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import boto3
import pytest
from aiobotocore.response import StreamingBody
from datasets import Audio, Features, Image, List, Pdf, Value
from moto import mock_s3
from urllib3._collections import HTTPHeaderDict

Expand All @@ -19,7 +18,6 @@
from libcommon.url_preparator import URLPreparator
from libcommon.viewer_utils.features import (
get_cell_value,
get_supported_unsupported_columns,
infer_audio_file_extension,
to_features_list,
)
Expand Down Expand Up @@ -102,27 +100,6 @@ def test_to_features_list(
assert first_feature["type"] == datasets_fixture.expected_feature_type


def test_get_supported_unsupported_columns() -> None:
features = Features(
{
"audio1": Audio(),
"audio2": Audio(sampling_rate=16_000),
"audio3": List(Audio()),
"image1": Image(),
"image2": Image(decode=False),
"image3": List(Image()),
"string": Value("string"),
"binary": Value("binary"),
"pdf": Pdf(),
"pdf2": [Pdf()],
}
)
unsupported_features = [Value("binary"), Audio()]
supported_columns, unsupported_columns = get_supported_unsupported_columns(features, unsupported_features)
assert supported_columns == ["image1", "image2", "image3", "string", "pdf", "pdf2"]
assert unsupported_columns == ["audio1", "audio2", "audio3", "binary"]


# specific test created for https://github.com/huggingface/dataset-viewer/issues/2045
# which is reproduced only when using s3 for fsspec
def test_ogg_audio_with_s3(
Expand Down
1 change: 0 additions & 1 deletion services/rows/src/rows/routes/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ async def rows_endpoint(request: Request) -> Response:
pa_table=pa_table,
offset=offset,
features=rows_index.parquet_index.features,
unsupported_columns=rows_index.parquet_index.unsupported_columns,
partial=rows_index.parquet_index.partial,
num_rows_total=rows_index.parquet_index.num_rows_total,
truncated_columns=truncated_columns,
Expand Down
10 changes: 3 additions & 7 deletions services/search/src/search/routes/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from libcommon.prometheus import StepProfiler
from libcommon.storage import StrPath, clean_dir
from libcommon.storage_client import StorageClient
from libcommon.viewer_utils.features import get_supported_unsupported_columns
from starlette.requests import Request
from starlette.responses import Response

Expand Down Expand Up @@ -147,15 +146,13 @@ async def filter_endpoint(request: Request) -> Response:
# features must contain the row idx column for full_text_search
features = Features.from_dict(content_parquet_metadata["features"])
features[ROW_IDX_COLUMN] = Value("int64")
with StepProfiler(method="filter_endpoint", step="get supported and unsupported columns"):
supported_columns, unsupported_columns = get_supported_unsupported_columns(
features,
)
columns = list(features.keys())

with StepProfiler(method="filter_endpoint", step="execute filter query"):
num_rows_total, pa_table = await anyio.to_thread.run_sync(
execute_filter_query,
index_file_location,
supported_columns,
columns,
where,
orderby,
length,
Expand All @@ -180,7 +177,6 @@ async def filter_endpoint(request: Request) -> Response:
pa_table=pa_table,
offset=offset,
features=features or Features.from_arrow_schema(pa_table.schema),
unsupported_columns=unsupported_columns,
num_rows_total=num_rows_total,
partial=partial,
use_row_idx_column=True,
Expand Down
18 changes: 4 additions & 14 deletions services/search/src/search/routes/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
from libcommon.prometheus import StepProfiler
from libcommon.storage import StrPath, clean_dir
from libcommon.storage_client import StorageClient
from libcommon.viewer_utils.features import (
get_supported_unsupported_columns,
to_features_list,
)
from libcommon.viewer_utils.features import to_features_list
from starlette.requests import Request
from starlette.responses import Response

Expand Down Expand Up @@ -86,14 +83,11 @@ async def create_response(
storage_client: StorageClient,
offset: int,
features: Features,
unsupported_columns: list[str],
num_rows_total: int,
partial: bool,
) -> PaginatedResponse:
features_without_key = features.copy()
features_without_key.pop(ROW_IDX_COLUMN, None)
if len(pa_table) > 0:
pa_table = pa_table.drop(unsupported_columns)
logging.info(f"create response for {dataset=} {config=} {split=}")

return PaginatedResponse(
Expand All @@ -107,7 +101,6 @@ async def create_response(
storage_client=storage_client,
offset=offset,
features=features,
unsupported_columns=unsupported_columns,
row_idx_column=ROW_IDX_COLUMN,
),
num_rows_total=num_rows_total,
Expand Down Expand Up @@ -202,16 +195,14 @@ async def search_endpoint(request: Request) -> Response:
# features must contain the row idx column for full_text_search
features = Features.from_dict(content_parquet_metadata["features"])
features[ROW_IDX_COLUMN] = Value("int64")
with StepProfiler(method="search_endpoint", step="get supported and unsupported columns"):
supported_columns, unsupported_columns = get_supported_unsupported_columns(
features,
)
columns = list(features.keys())

with StepProfiler(method="search_endpoint", step="perform FTS command"):
logging.debug(f"connect to index file {index_file_location}")
num_rows_total, pa_table = await anyio.to_thread.run_sync(
full_text_search,
index_file_location,
supported_columns,
columns,
query,
offset,
length,
Expand All @@ -235,7 +226,6 @@ async def search_endpoint(request: Request) -> Response:
storage_client=cached_assets_storage_client,
offset=offset,
features=features or Features.from_arrow_schema(pa_table.schema),
unsupported_columns=unsupported_columns,
num_rows_total=num_rows_total,
partial=partial,
)
Expand Down
1 change: 0 additions & 1 deletion services/search/tests/routes/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ async def test_create_response(ds: Dataset, app_config: AppConfig, storage_clien
pa_table=pa_table,
offset=0,
features=ds.features,
unsupported_columns=[],
num_rows_total=4,
partial=False,
use_row_idx_column=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,6 @@ def test_ParquetIndexWithMetadata_query(
pf.metadata.write_metadata_file(metadata_path)
index = ParquetIndexWithMetadata(
features=features,
supported_columns=list(features),
unsupported_columns=[],
parquet_files_urls=[url],
metadata_paths=[metadata_path],
num_rows=[num_rows],
Expand Down