From 254a7012c46cf95ae118b13248183cb80cad807d Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 4 Jul 2024 00:56:30 -0400 Subject: [PATCH] Add Files metadata table (#614) --- mkdocs/docs/api.md | 131 ++++++++++++++++++ pyiceberg/table/__init__.py | 103 ++++++++++++++ tests/integration/test_inspect_table.py | 170 ++++++++++++++++++++++++ 3 files changed, 404 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 6da2fc3a8b..0e80b6eb5e 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -700,6 +700,137 @@ parent_id: [[null,4358109269873137077,null,4358109269873137077]] is_current_ancestor: [[true,false,true,true]] ``` +### Files + +Inspect the data files in the current snapshot of the table: + +```python +table.inspect.files() +``` + +``` +pyarrow.Table +content: int8 not null +file_path: string not null +file_format: dictionary not null +spec_id: int32 not null +record_count: int64 not null +file_size_in_bytes: int64 not null +column_sizes: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +null_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +nan_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +lower_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary +upper_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary +key_metadata: binary +split_offsets: list + child 0, item: int64 +equality_ids: list + child 0, item: int32 +sort_order_id: int32 +readable_metrics: struct not null, lat: struct not null, long: struct not null> + child 0, city: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: large_string + child 5, upper_bound: large_string + child 1, lat: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double + child 2, long: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double +---- +content: [[0,0]] +file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]] +file_format: [["PARQUET","PARQUET"]] +spec_id: [[0,0]] +record_count: [[3,3]] +file_size_in_bytes: [[5459,5459]] +column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]] +value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3]]] +null_value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1]]] +nan_value_counts: [[keys:[]values:[],keys:[]values:[]]] +lower_bounds: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]] +upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]] +key_metadata: [[0100,0100]] +split_offsets:[[[],[]]] +equality_ids:[[[],[]]] +sort_order_id:[[[],[]]] +readable_metrics: [ + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[140] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: large_string +["Amsterdam"] + -- child 5 type: large_string +["San Francisco"] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[37.773972] + -- child 5 type: double +[53.11254] + -- child 2 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[-122.431297] + -- child 5 type: double +[6.0989]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8c1493974b..2eec4d3036 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3907,6 +3907,109 @@ def history(self) -> "pa.Table": return pa.Table.from_pylist(history, schema=history_schema) + def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + schema = self.tbl.metadata.schema() + readable_metrics_struct = [] + + def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: + pa_bound_type = schema_to_pyarrow(bound_type) + return pa.struct([ + pa.field("column_size", pa.int64(), nullable=True), + pa.field("value_count", pa.int64(), nullable=True), + pa.field("null_value_count", pa.int64(), nullable=True), + pa.field("nan_value_count", pa.int64(), nullable=True), + pa.field("lower_bound", pa_bound_type, nullable=True), + pa.field("upper_bound", pa_bound_type, nullable=True), + ]) + + for field in self.tbl.metadata.schema().fields: + readable_metrics_struct.append( + pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) + ) + + files_schema = pa.schema([ + pa.field("content", pa.int8(), nullable=False), + pa.field("file_path", pa.string(), nullable=False), + pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), + pa.field("spec_id", pa.int32(), nullable=False), + pa.field("record_count", pa.int64(), nullable=False), + pa.field("file_size_in_bytes", pa.int64(), nullable=False), + pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field("key_metadata", pa.binary(), nullable=True), + pa.field("split_offsets", pa.list_(pa.int64()), nullable=True), + pa.field("equality_ids", pa.list_(pa.int32()), nullable=True), + pa.field("sort_order_id", pa.int32(), nullable=True), + pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), + ]) + + files: list[dict[str, Any]] = [] + + if not snapshot_id and not self.tbl.metadata.current_snapshot(): + return pa.Table.from_pylist( + files, + schema=files_schema, + ) + snapshot = self._get_snapshot(snapshot_id) + + io = self.tbl.io + for manifest_list in snapshot.manifests(io): + for manifest_entry in manifest_list.fetch_manifest_entry(io): + data_file = manifest_entry.data_file + column_sizes = data_file.column_sizes or {} + value_counts = data_file.value_counts or {} + null_value_counts = data_file.null_value_counts or {} + nan_value_counts = data_file.nan_value_counts or {} + lower_bounds = data_file.lower_bounds or {} + upper_bounds = data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, + } + for field in self.tbl.metadata.schema().fields + } + files.append({ + "content": data_file.content, + "file_path": data_file.file_path, + "file_format": data_file.file_format, + "spec_id": data_file.spec_id, + "record_count": data_file.record_count, + "file_size_in_bytes": data_file.file_size_in_bytes, + "column_sizes": dict(data_file.column_sizes), + "value_counts": dict(data_file.value_counts), + "null_value_counts": dict(data_file.null_value_counts), + "nan_value_counts": dict(data_file.nan_value_counts), + "lower_bounds": dict(data_file.lower_bounds), + "upper_bounds": dict(data_file.upper_bounds), + "key_metadata": data_file.key_metadata, + "split_offsets": data_file.split_offsets, + "equality_ids": data_file.equality_ids, + "sort_order_id": data_file.sort_order_id, + "readable_metrics": readable_metrics, + }) + + return pa.Table.from_pylist( + files, + schema=files_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 8414fba333..834fe83d5f 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -637,3 +637,173 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v # NaN != NaN in Python continue assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_files" + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + tbl.overwrite(arrow_table_with_null) + + # append more data + tbl.append(arrow_table_with_null) + + df = tbl.refresh().inspect.files() + + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + # make sure the non-nullable fields are filled + for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for split_offsets in df["split_offsets"]: + assert isinstance(split_offsets.as_py(), list) + + for file_format in df["file_format"]: + assert file_format.as_py() == "PARQUET" + + for file_path in df["file_path"]: + assert file_path.as_py().startswith("s3://") + + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.files").toPandas() + + lhs_subset = lhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + rhs_subset = rhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + + assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) + + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + if column in [ + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + ]: + if isinstance(right, dict): + left = dict(left) + assert left == right, f"Difference in column {column}: {left} != {right}" + + elif column == "readable_metrics": + assert list(left.keys()) == [ + "bool", + "string", + "string_long", + "int", + "long", + "float", + "double", + "timestamp", + "timestamptz", + "date", + "binary", + "fixed", + ] + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz": + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + df = tbl.refresh().inspect.files() + + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + assert df.to_pandas().empty is True