diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..5909675bee 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1047,6 +1047,355 @@ readable_metrics: [ To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. +### All Metadata Tables + +These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots. +!!! danger + The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot. + +#### All Entries + +To show the table's manifest entries from all the snapshots for both data and delete files: + +```python +table.inspect.all_entries() +``` + +```python +pyarrow.Table +status: int8 not null +snapshot_id: int64 not null +sequence_number: int64 not null +file_sequence_number: int64 not null +data_file: struct not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map, value_counts: map, null_value_counts: map, nan_value_counts: map, lower_bounds: map, upper_bounds: map, key_metadata: binary, split_offsets: list, equality_ids: list, sort_order_id: int32> not null + child 0, content: int8 not null + child 1, file_path: string not null + child 2, file_format: string not null + child 3, spec_id: int32 not null + child 4, partition: struct not null + child 0, data: large_string + child 5, record_count: int64 not null + child 6, file_size_in_bytes: int64 not null + child 7, column_sizes: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 8, value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 9, null_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 10, nan_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 11, lower_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 12, upper_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 13, key_metadata: binary + child 14, split_offsets: list + child 0, item: int64 + child 15, equality_ids: list + child 0, item: int32 + child 16, sort_order_id: int32 +readable_metrics: struct not null, data: struct not null> + child 0, id: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: int32 + child 5, upper_bound: int32 + child 1, data: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: large_string + child 5, upper_bound: large_string +---- +status: [[1],[1],...,[],[]] +snapshot_id: [[6449946327458654223],[7507782590078860647],...,[],[]] +sequence_number: [[1],[2],...,[],[]] +file_sequence_number: [[1],[2],...,[],[]] +data_file: [ + -- is_valid: all not null + -- child 0 type: int8 +[0] + -- child 1 type: string +["s3://warehouse/default/table_metadata_all_entries/data/data=a/00000-1-20675924-1844-4414-aa3b-cbb033884013-0-00001.parquet"] + -- child 2 type: string +["PARQUET"] + -- child 3 type: int32 +[0] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +["a"] + -- child 5 type: int64 +[1] + -- child 6 type: int64 +[636] + -- child 7 type: map +[keys:[1,2]values:[39,40]] + -- child 8 type: map +[keys:[1,2]values:[1,1]] + -- child 9 type: map +[keys:[1,2]values:[0,0]] + -- child 10 type: map +[keys:[]values:[]] + -- child 11 type: map +[keys:[1,2]values:[01000000,61]] + -- child 12 type: map +[keys:[1,2]values:[01000000,61]] + -- child 13 type: binary +[null] + -- child 14 type: list +[[4]] + -- child 15 type: list +[null] + -- child 16 type: int32 +[0], + -- is_valid: all not null + -- child 0 type: int8 +[0] + -- child 1 type: string +["s3://warehouse/default/table_metadata_all_entries/data/data=b/00000-3-c28af222-7039-435e-b2a9-a4dc698b75e5-0-00001.parquet"] + -- child 2 type: string +["PARQUET"] + -- child 3 type: int32 +[0] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +["b"] + -- child 5 type: int64 +[1] + -- child 6 type: int64 +[636] + -- child 7 type: map +[keys:[1,2]values:[39,40]] + -- child 8 type: map +[keys:[1,2]values:[1,1]] + -- child 9 type: map +[keys:[1,2]values:[0,0]] + -- child 10 type: map +[keys:[]values:[]] + -- child 11 type: map +[keys:[1,2]values:[02000000,62]] + -- child 12 type: map +[keys:[1,2]values:[02000000,62]] + -- child 13 type: binary +[null] + -- child 14 type: list +[[4]] + -- child 15 type: list +[null] + -- child 16 type: int32 +[0], +..., + -- is_valid: all not null + -- child 0 type: int8 +[] + -- child 1 type: string +[] + -- child 2 type: string +[] + -- child 3 type: int32 +[] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +[] + -- child 5 type: int64 +[] + -- child 6 type: int64 +[] + -- child 7 type: map +[] + -- child 8 type: map +[] + -- child 9 type: map +[] + -- child 10 type: map +[] + -- child 11 type: map +[] + -- child 12 type: map +[] + -- child 13 type: binary +[] + -- child 14 type: list +[] + -- child 15 type: list +[] + -- child 16 type: int32 +[], + -- is_valid: all not null + -- child 0 type: int8 +[] + -- child 1 type: string +[] + -- child 2 type: string +[] + -- child 3 type: int32 +[] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +[] + -- child 5 type: int64 +[] + -- child 6 type: int64 +[] + -- child 7 type: map +[] + -- child 8 type: map +[] + -- child 9 type: map +[] + -- child 10 type: map +[] + -- child 11 type: map +[] + -- child 12 type: map +[] + -- child 13 type: binary +[] + -- child 14 type: list +[] + -- child 15 type: list +[] + -- child 16 type: int32 +[]] +readable_metrics: [ + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[39] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: int32 +[1] + -- child 5 type: int32 +[1] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[40] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: large_string +["a"] + -- child 5 type: large_string +["a"], + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[39] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: int32 +[2] + -- child 5 type: int32 +[2] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[40] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: large_string +["b"] + -- child 5 type: large_string +["b"], +..., + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: int32 +[] + -- child 5 type: int32 +[] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: large_string +[] + -- child 5 type: large_string +[], + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: int32 +[] + -- child 5 type: int32 +[] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: large_string +[] + -- child 5 type: large_string +[]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cce5250ad5..dc322dc6e3 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -22,6 +22,7 @@ from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType from pyiceberg.utils.concurrent import ExecutorFactory @@ -94,12 +95,13 @@ def snapshots(self) -> "pa.Table": schema=snapshots_schema, ) - def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": + def _get_entries_schema(self, schema: Optional[Schema] = None) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow - schema = self.tbl.metadata.schema() + if not schema: + schema = self.tbl.metadata.schema() readable_metrics_struct = [] @@ -137,6 +139,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.string(), nullable=False), + pa.field("spec_id", pa.int32(), nullable=False), pa.field("partition", pa_record_struct, nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), @@ -157,74 +160,98 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + + def _get_entries(self, schema: Schema, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + entries_schema = self._get_entries_schema(schema=schema) entries = [] - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False): - column_sizes = entry.data_file.column_sizes or {} - value_counts = entry.data_file.value_counts or {} - null_value_counts = entry.data_file.null_value_counts or {} - nan_value_counts = entry.data_file.nan_value_counts or {} - lower_bounds = entry.data_file.lower_bounds or {} - upper_bounds = entry.data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, } + for field in schema.fields + } - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } - entries.append( - { - "status": entry.status.value, - "snapshot_id": entry.snapshot_id, - "sequence_number": entry.sequence_number, - "file_sequence_number": entry.file_sequence_number, - "data_file": { - "content": entry.data_file.content, - "file_path": entry.data_file.file_path, - "file_format": entry.data_file.file_format, - "partition": partition_record_dict, - "record_count": entry.data_file.record_count, - "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes), - "value_counts": dict(entry.data_file.value_counts or {}), - "null_value_counts": dict(entry.data_file.null_value_counts or {}), - "nan_value_counts": dict(entry.data_file.nan_value_counts or {}), - "lower_bounds": entry.data_file.lower_bounds, - "upper_bounds": entry.data_file.upper_bounds, - "key_metadata": entry.data_file.key_metadata, - "split_offsets": entry.data_file.split_offsets, - "equality_ids": entry.data_file.equality_ids, - "sort_order_id": entry.data_file.sort_order_id, - "spec_id": entry.data_file.spec_id, - }, - "readable_metrics": readable_metrics, - } - ) + entries.append( + { + "status": entry.status.value, + "snapshot_id": entry.snapshot_id, + "sequence_number": entry.sequence_number, + "file_sequence_number": entry.file_sequence_number, + "data_file": { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, + "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, + "null_value_counts": dict(entry.data_file.null_value_counts) + if entry.data_file.null_value_counts is not None + else None, + "nan_value_counts": dict(entry.data_file.nan_value_counts) + if entry.data_file.nan_value_counts is not None + else None, + "lower_bounds": entry.data_file.lower_bounds, + "upper_bounds": entry.data_file.upper_bounds, + "key_metadata": entry.data_file.key_metadata, + "split_offsets": entry.data_file.split_offsets, + "equality_ids": entry.data_file.equality_ids, + "sort_order_id": entry.data_file.sort_order_id, + "spec_id": entry.data_file.spec_id, + }, + "readable_metrics": readable_metrics, + } + ) return pa.Table.from_pylist( entries, schema=entries_schema, ) + def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": + import pyarrow as pa + + entries = [] + snapshot = self._get_snapshot(snapshot_id) + + if snapshot.schema_id is None: + raise ValueError(f"Cannot find schema_id for snapshot {snapshot.snapshot_id}") + + schema = self.tbl.schemas().get(snapshot.schema_id) + if not schema: + raise ValueError(f"Cannot find schema with ID {snapshot.schema_id}") + for manifest in snapshot.manifests(self.tbl.io): + manifest_entries = self._get_entries(schema=schema, manifest=manifest, discard_deleted=True) + entries.append(manifest_entries) + return pa.concat_tables(entries) + def refs(self) -> "pa.Table": import pyarrow as pa @@ -704,3 +731,30 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) + + def all_entries(self) -> "pa.Table": + import pyarrow as pa + + snapshots = self.tbl.snapshots() + if not snapshots: + return pa.Table.from_pylist([], self._get_entries_schema()) + + schemas = self.tbl.schemas() + snapshot_schemas: Dict[int, "pa.Schema"] = {} + for snapshot in snapshots: + if snapshot.schema_id is None: + raise ValueError(f"Cannot find schema_id for snapshot: {snapshot.snapshot_id}") + else: + snapshot_schemas[snapshot.snapshot_id] = schemas.get(snapshot.schema_id) + + executor = ExecutorFactory.get_or_create() + all_manifests: Iterator[List[ManifestFile]] = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots) + unique_flattened_manifests = list( + {manifest.manifest_path: manifest for manifest_list in all_manifests for manifest in manifest_list}.values() + ) + + entries: Iterator["pa.Table"] = executor.map( + lambda manifest: self._get_entries(snapshot_schemas[manifest.added_snapshot_id], manifest, discard_deleted=True), + unique_flattened_manifests, + ) + return pa.concat_tables(entries) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index e81050a81c..e234715fc2 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1101,3 +1101,105 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog .reset_index() ) assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_all_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] + + # Check first 4 columns are of the correct type + for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + # The rest of the code checks the data_file and readable_metrics columns + # Convert both dataframes to pandas and sort them the same way for comparison + lhs = df.to_pandas() + rhs = spark_df.toPandas() + for df_to_check in [lhs, rhs]: + df_to_check["content"] = df_to_check["data_file"].apply(lambda x: x.get("content")) + df_to_check["file_path"] = df_to_check["data_file"].apply(lambda x: x.get("file_path")) + df_to_check.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"], inplace=True) + df_to_check.drop(columns=["file_path", "content"], inplace=True) + + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == "data_file": + for df_column in left.keys(): + if df_column == "partition": + # Spark leaves out the partition if the table is unpartitioned + continue + + df_lhs = left[df_column] + df_rhs = right[df_column] + if isinstance(df_rhs, dict): + # Arrow turns dicts into lists of tuple + df_lhs = dict(df_lhs) + + assert df_lhs == df_rhs, f"Difference in data_file column {df_column}: {df_lhs} != {df_rhs}" + elif column == "readable_metrics": + assert list(left.keys()) == ["id", "data"] + + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz": + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + tbl.refresh() + + df = tbl.inspect.all_entries() + spark_df = spark.table(f"{identifier}.all_entries") + check_pyiceberg_df_equals_spark_df(df, spark_df)