From 815ea0d27221bedce8f29d5d5a24af2088d93595 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Tue, 4 Feb 2025 19:08:19 +0200 Subject: [PATCH 1/6] support all_entries in pyiceberg --- pyiceberg/table/inspect.py | 24 +++++++++++- tests/integration/test_inspect_table.py | 52 +++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cce5250ad5..4401641f23 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -94,7 +94,7 @@ def snapshots(self) -> "pa.Table": schema=snapshots_schema, ) - def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": + def _get_entries_schema(self) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -157,11 +157,18 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + + def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + schema = self.tbl.metadata.schema() + + entries_schema = self._get_entries_schema() entries = [] snapshot = self._get_snapshot(snapshot_id) for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False): + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): column_sizes = entry.data_file.column_sizes or {} value_counts = entry.data_file.value_counts or {} null_value_counts = entry.data_file.null_value_counts or {} @@ -704,3 +711,16 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) + + def all_entries(self) -> "pa.Table": + import pyarrow as pa + + snapshots = self.tbl.snapshots() + if not snapshots: + return pa.Table.from_pylist([], self._get_entries_schema()) + + executor = ExecutorFactory.get_or_create() + all_entries: Iterator["pa.Table"] = executor.map( + lambda snapshot_id: self.entries(snapshot_id, discard_deleted=False), [snapshot.snapshot_id for snapshot in snapshots] + ) + return pa.concat_tables(all_entries) \ No newline at end of file diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index e81050a81c..935e3f53b7 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1101,3 +1101,55 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog .reset_index() ) assert_frame_equal(lhs, rhs, check_dtype=False) + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_all_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read', 'format-version'= {format_version}) + """ + ) + tbl = session_catalog.load_table(identifier) + + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + tbl.refresh() + + df = tbl.inspect.all_entries() + + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] + + lhs = spark.table(f"{identifier}.all_entries").toPandas() + rhs = df.to_pandas() + assert_frame_equal(lhs, rhs, check_dtype=False) From 56235e3b490f5c37c47db51e47bcfc54a0596210 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Wed, 5 Feb 2025 07:53:53 +0200 Subject: [PATCH 2/6] support all_entries in pyiceberg --- pyiceberg/table/inspect.py | 142 +++++++++++++----------- tests/integration/test_inspect_table.py | 91 ++++++++++++--- 2 files changed, 152 insertions(+), 81 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 4401641f23..27c233c141 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -137,6 +137,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.string(), nullable=False), + pa.field("spec_id", pa.int32(), nullable=False), pa.field("partition", pa_record_struct, nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), @@ -159,79 +160,90 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ) return entries_schema - def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": + def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": import pyarrow as pa - schema = self.tbl.metadata.schema() + schema = self.tbl.metadata.schema() entries_schema = self._get_entries_schema() - entries = [] - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): - column_sizes = entry.data_file.column_sizes or {} - value_counts = entry.data_file.value_counts or {} - null_value_counts = entry.data_file.null_value_counts or {} - nan_value_counts = entry.data_file.nan_value_counts or {} - lower_bounds = entry.data_file.lower_bounds or {} - upper_bounds = entry.data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, } + for field in self.tbl.metadata.schema().fields + } - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } - entries.append( - { - "status": entry.status.value, - "snapshot_id": entry.snapshot_id, - "sequence_number": entry.sequence_number, - "file_sequence_number": entry.file_sequence_number, - "data_file": { - "content": entry.data_file.content, - "file_path": entry.data_file.file_path, - "file_format": entry.data_file.file_format, - "partition": partition_record_dict, - "record_count": entry.data_file.record_count, - "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes), - "value_counts": dict(entry.data_file.value_counts or {}), - "null_value_counts": dict(entry.data_file.null_value_counts or {}), - "nan_value_counts": dict(entry.data_file.nan_value_counts or {}), - "lower_bounds": entry.data_file.lower_bounds, - "upper_bounds": entry.data_file.upper_bounds, - "key_metadata": entry.data_file.key_metadata, - "split_offsets": entry.data_file.split_offsets, - "equality_ids": entry.data_file.equality_ids, - "sort_order_id": entry.data_file.sort_order_id, - "spec_id": entry.data_file.spec_id, - }, - "readable_metrics": readable_metrics, - } - ) + entries.append( + { + "status": entry.status.value, + "snapshot_id": entry.snapshot_id, + "sequence_number": entry.sequence_number, + "file_sequence_number": entry.file_sequence_number, + "data_file": { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, + "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, + "null_value_counts": dict(entry.data_file.null_value_counts) + if entry.data_file.null_value_counts is not None + else None, + "nan_value_counts": dict(entry.data_file.nan_value_counts) + if entry.data_file.nan_value_counts is not None + else None, + "lower_bounds": entry.data_file.lower_bounds, + "upper_bounds": entry.data_file.upper_bounds, + "key_metadata": entry.data_file.key_metadata, + "split_offsets": entry.data_file.split_offsets, + "equality_ids": entry.data_file.equality_ids, + "sort_order_id": entry.data_file.sort_order_id, + "spec_id": entry.data_file.spec_id, + }, + "readable_metrics": readable_metrics, + } + ) return pa.Table.from_pylist( entries, schema=entries_schema, ) + def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + + entries = [] + snapshot = self._get_snapshot(snapshot_id) + for manifest in snapshot.manifests(self.tbl.io): + manifest_entries = self._get_entries(manifest) + entries.append(manifest_entries) + return pa.concat_tables(entries) + def refs(self) -> "pa.Table": import pyarrow as pa @@ -720,7 +732,11 @@ def all_entries(self) -> "pa.Table": return pa.Table.from_pylist([], self._get_entries_schema()) executor = ExecutorFactory.get_or_create() - all_entries: Iterator["pa.Table"] = executor.map( - lambda snapshot_id: self.entries(snapshot_id, discard_deleted=False), [snapshot.snapshot_id for snapshot in snapshots] + all_manifests: Iterator[List[ManifestFile]] = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots) + unique_flattened_manifests = list( + {manifest.manifest_path: manifest for manifest_list in all_manifests for manifest in manifest_list}.values() + ) + entries: Iterator["pa.Table"] = executor.map( + lambda manifest: self._get_entries(manifest, discard_deleted=False), unique_flattened_manifests ) - return pa.concat_tables(all_entries) \ No newline at end of file + return pa.concat_tables(entries) \ No newline at end of file diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 935e3f53b7..870273dc81 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1105,8 +1105,6 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - from pandas.testing import assert_frame_equal - identifier = "default.table_metadata_all_entries" try: session_catalog.drop_table(identifier=identifier) @@ -1121,14 +1119,12 @@ def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, form ) PARTITIONED BY (data) TBLPROPERTIES ('write.update.mode'='merge-on-read', - 'write.delete.mode'='merge-on-read', 'format-version'= {format_version}) + 'write.delete.mode'='merge-on-read') """ ) tbl = session_catalog.load_table(identifier) - spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") - spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") @@ -1137,19 +1133,78 @@ def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, form spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") - tbl.refresh() + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] - df = tbl.inspect.all_entries() + # Make sure that they are filled properly + for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) - assert df.column_names == [ - "status", - "snapshot_id", - "sequence_number", - "file_sequence_number", - "data_file", - "readable_metrics", - ] + for snapshot_id in df["snapshot_id"]: + assert isinstance(snapshot_id.as_py(), int) - lhs = spark.table(f"{identifier}.all_entries").toPandas() - rhs = df.to_pandas() - assert_frame_equal(lhs, rhs, check_dtype=False) + lhs = df.to_pandas() + lhs["content"] = lhs["data_file"].apply(lambda x: x.get("content")) + lhs["file_path"] = lhs["data_file"].apply(lambda x: x.get("file_path")) + lhs = lhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop( + columns=["file_path", "content"] + ) + rhs = spark_df.toPandas() + rhs["content"] = rhs["data_file"].apply(lambda x: x.get("content")) + rhs["file_path"] = rhs["data_file"].apply(lambda x: x.get("file_path")) + rhs = rhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop( + columns=["file_path", "content"] + ) + + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == "data_file": + for df_column in left.keys(): + if df_column == "partition": + # Spark leaves out the partition if the table is unpartitioned + continue + + df_lhs = left[df_column] + df_rhs = right[df_column] + if isinstance(df_rhs, dict): + # Arrow turns dicts into lists of tuple + df_lhs = dict(df_lhs) + + assert df_lhs == df_rhs, f"Difference in data_file column {df_column}: {df_lhs} != {df_rhs}" + elif column == "readable_metrics": + assert list(left.keys()) == ["id", "data"] + + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz": + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + tbl.refresh() + + df = tbl.inspect.all_entries() + spark_df = spark.table(f"{identifier}.all_entries") + check_pyiceberg_df_equals_spark_df(df, spark_df) From d88179b8a8b21985b062f1f1808fd287ec0cf26f Mon Sep 17 00:00:00 2001 From: amitgilad Date: Sat, 8 Feb 2025 22:29:17 +0200 Subject: [PATCH 3/6] fix comments of pr --- mkdocs/docs/api.md | 2 ++ pyiceberg/table/inspect.py | 30 ++++++++++++++++++------- tests/integration/test_inspect_table.py | 22 +++++++----------- 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..f65c651480 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -792,6 +792,8 @@ readable_metrics: [ [6.0989]] ``` +To show all the table's current manifest entries for both data and delete files, use `table.inspect.all_entries()`. + ### References To show a table's known snapshot references: diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 27c233c141..76820ff366 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -160,10 +160,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ) return entries_schema - def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + def _get_entries(self, schema: "pa.Schema", manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": import pyarrow as pa - schema = self.tbl.metadata.schema() entries_schema = self._get_entries_schema() entries = [] for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): @@ -209,7 +208,7 @@ def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "partition": partition_record_dict, "record_count": entry.data_file.record_count, "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, + "column_sizes": dict(entry.data_file.column_sizes) or None, "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, "null_value_counts": dict(entry.data_file.null_value_counts) if entry.data_file.null_value_counts is not None @@ -234,13 +233,18 @@ def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> schema=entries_schema, ) - def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": + def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa entries = [] snapshot = self._get_snapshot(snapshot_id) + + if snapshot.schema_id is None: + raise ValueError(f"Cannot find schema_id for snapshot {snapshot.snapshot_id}") + + schema = self.tbl.schemas().get(snapshot.schema_id) for manifest in snapshot.manifests(self.tbl.io): - manifest_entries = self._get_entries(manifest) + manifest_entries = self._get_entries(schema=schema, manifest=manifest, discard_deleted=True) entries.append(manifest_entries) return pa.concat_tables(entries) @@ -724,19 +728,29 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_entries(self) -> "pa.Table": + def all_entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + snapshots = self.tbl.snapshots() if snapshot_id is None else [self._get_snapshot(snapshot_id)] if not snapshots: return pa.Table.from_pylist([], self._get_entries_schema()) + schemas = self.tbl.schemas() + snapshot_schemas: Dict[int, "pa.Schema"] = {} + for snapshot in snapshots: + if snapshot.schema_id is None: + raise ValueError(f"Cannot find schema_id for snapshot: {snapshot.snapshot_id}") + else: + snapshot_schemas[snapshot.snapshot_id] = schemas.get(snapshot.schema_id) + executor = ExecutorFactory.get_or_create() all_manifests: Iterator[List[ManifestFile]] = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots) unique_flattened_manifests = list( {manifest.manifest_path: manifest for manifest_list in all_manifests for manifest in manifest_list}.values() ) + entries: Iterator["pa.Table"] = executor.map( - lambda manifest: self._get_entries(manifest, discard_deleted=False), unique_flattened_manifests + lambda manifest: self._get_entries(snapshot_schemas[manifest.added_snapshot_id], manifest, discard_deleted=True), + unique_flattened_manifests, ) return pa.concat_tables(entries) \ No newline at end of file diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 870273dc81..317026acf5 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1143,26 +1143,20 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non "readable_metrics", ] - # Make sure that they are filled properly + # Check first 4 columns are of the correct type for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: for value in df[int_column]: assert isinstance(value.as_py(), int) - for snapshot_id in df["snapshot_id"]: - assert isinstance(snapshot_id.as_py(), int) - + # The rest of the code checks the data_file and readable_metrics columns + # Convert both dataframes to pandas and sort them the same way for comparison lhs = df.to_pandas() - lhs["content"] = lhs["data_file"].apply(lambda x: x.get("content")) - lhs["file_path"] = lhs["data_file"].apply(lambda x: x.get("file_path")) - lhs = lhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop( - columns=["file_path", "content"] - ) rhs = spark_df.toPandas() - rhs["content"] = rhs["data_file"].apply(lambda x: x.get("content")) - rhs["file_path"] = rhs["data_file"].apply(lambda x: x.get("file_path")) - rhs = rhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop( - columns=["file_path", "content"] - ) + for df_to_check in [lhs, rhs]: + df_to_check["content"] = df_to_check["data_file"].apply(lambda x: x.get("content")) + df_to_check["file_path"] = df_to_check["data_file"].apply(lambda x: x.get("file_path")) + df_to_check.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"], inplace=True) + df_to_check.drop(columns=["file_path", "content"], inplace=True) for column in df.column_names: for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): From 1834dd87976e413397af29bd2b10c8905716f202 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Mon, 10 Feb 2025 19:59:11 +0200 Subject: [PATCH 4/6] remove option for a specific snapshot --- pyiceberg/table/inspect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 76820ff366..39b056966d 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -728,10 +728,10 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": + def all_entries(self) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() if snapshot_id is None else [self._get_snapshot(snapshot_id)] + snapshots = self.tbl.snapshots() if not snapshots: return pa.Table.from_pylist([], self._get_entries_schema()) From 88880932e37564f779074200fb96dd102bff47f4 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Mon, 17 Feb 2025 22:29:34 +0200 Subject: [PATCH 5/6] 1. update docs 2. dont use latest schema and partition spec , use the one at the time of the snapshot instead --- mkdocs/docs/api.md | 351 ++++++++++++++++++++++++++++++++++++- pyiceberg/table/inspect.py | 16 +- 2 files changed, 359 insertions(+), 8 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index f65c651480..5909675bee 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -792,8 +792,6 @@ readable_metrics: [ [6.0989]] ``` -To show all the table's current manifest entries for both data and delete files, use `table.inspect.all_entries()`. - ### References To show a table's known snapshot references: @@ -1049,6 +1047,355 @@ readable_metrics: [ To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. +### All Metadata Tables + +These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots. +!!! danger + The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot. + +#### All Entries + +To show the table's manifest entries from all the snapshots for both data and delete files: + +```python +table.inspect.all_entries() +``` + +```python +pyarrow.Table +status: int8 not null +snapshot_id: int64 not null +sequence_number: int64 not null +file_sequence_number: int64 not null +data_file: struct not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map, value_counts: map, null_value_counts: map, nan_value_counts: map, lower_bounds: map, upper_bounds: map, key_metadata: binary, split_offsets: list, equality_ids: list, sort_order_id: int32> not null + child 0, content: int8 not null + child 1, file_path: string not null + child 2, file_format: string not null + child 3, spec_id: int32 not null + child 4, partition: struct not null + child 0, data: large_string + child 5, record_count: int64 not null + child 6, file_size_in_bytes: int64 not null + child 7, column_sizes: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 8, value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 9, null_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 10, nan_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 11, lower_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 12, upper_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 13, key_metadata: binary + child 14, split_offsets: list + child 0, item: int64 + child 15, equality_ids: list + child 0, item: int32 + child 16, sort_order_id: int32 +readable_metrics: struct not null, data: struct not null> + child 0, id: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: int32 + child 5, upper_bound: int32 + child 1, data: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: large_string + child 5, upper_bound: large_string +---- +status: [[1],[1],...,[],[]] +snapshot_id: [[6449946327458654223],[7507782590078860647],...,[],[]] +sequence_number: [[1],[2],...,[],[]] +file_sequence_number: [[1],[2],...,[],[]] +data_file: [ + -- is_valid: all not null + -- child 0 type: int8 +[0] + -- child 1 type: string +["s3://warehouse/default/table_metadata_all_entries/data/data=a/00000-1-20675924-1844-4414-aa3b-cbb033884013-0-00001.parquet"] + -- child 2 type: string +["PARQUET"] + -- child 3 type: int32 +[0] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +["a"] + -- child 5 type: int64 +[1] + -- child 6 type: int64 +[636] + -- child 7 type: map +[keys:[1,2]values:[39,40]] + -- child 8 type: map +[keys:[1,2]values:[1,1]] + -- child 9 type: map +[keys:[1,2]values:[0,0]] + -- child 10 type: map +[keys:[]values:[]] + -- child 11 type: map +[keys:[1,2]values:[01000000,61]] + -- child 12 type: map +[keys:[1,2]values:[01000000,61]] + -- child 13 type: binary +[null] + -- child 14 type: list +[[4]] + -- child 15 type: list +[null] + -- child 16 type: int32 +[0], + -- is_valid: all not null + -- child 0 type: int8 +[0] + -- child 1 type: string +["s3://warehouse/default/table_metadata_all_entries/data/data=b/00000-3-c28af222-7039-435e-b2a9-a4dc698b75e5-0-00001.parquet"] + -- child 2 type: string +["PARQUET"] + -- child 3 type: int32 +[0] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +["b"] + -- child 5 type: int64 +[1] + -- child 6 type: int64 +[636] + -- child 7 type: map +[keys:[1,2]values:[39,40]] + -- child 8 type: map +[keys:[1,2]values:[1,1]] + -- child 9 type: map +[keys:[1,2]values:[0,0]] + -- child 10 type: map +[keys:[]values:[]] + -- child 11 type: map +[keys:[1,2]values:[02000000,62]] + -- child 12 type: map +[keys:[1,2]values:[02000000,62]] + -- child 13 type: binary +[null] + -- child 14 type: list +[[4]] + -- child 15 type: list +[null] + -- child 16 type: int32 +[0], +..., + -- is_valid: all not null + -- child 0 type: int8 +[] + -- child 1 type: string +[] + -- child 2 type: string +[] + -- child 3 type: int32 +[] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +[] + -- child 5 type: int64 +[] + -- child 6 type: int64 +[] + -- child 7 type: map +[] + -- child 8 type: map +[] + -- child 9 type: map +[] + -- child 10 type: map +[] + -- child 11 type: map +[] + -- child 12 type: map +[] + -- child 13 type: binary +[] + -- child 14 type: list +[] + -- child 15 type: list +[] + -- child 16 type: int32 +[], + -- is_valid: all not null + -- child 0 type: int8 +[] + -- child 1 type: string +[] + -- child 2 type: string +[] + -- child 3 type: int32 +[] + -- child 4 type: struct + -- is_valid: all not null + -- child 0 type: large_string +[] + -- child 5 type: int64 +[] + -- child 6 type: int64 +[] + -- child 7 type: map +[] + -- child 8 type: map +[] + -- child 9 type: map +[] + -- child 10 type: map +[] + -- child 11 type: map +[] + -- child 12 type: map +[] + -- child 13 type: binary +[] + -- child 14 type: list +[] + -- child 15 type: list +[] + -- child 16 type: int32 +[]] +readable_metrics: [ + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[39] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: int32 +[1] + -- child 5 type: int32 +[1] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[40] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: large_string +["a"] + -- child 5 type: large_string +["a"], + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[39] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: int32 +[2] + -- child 5 type: int32 +[2] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[40] + -- child 1 type: int64 +[1] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: large_string +["b"] + -- child 5 type: large_string +["b"], +..., + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: int32 +[] + -- child 5 type: int32 +[] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: large_string +[] + -- child 5 type: large_string +[], + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: int32 +[] + -- child 5 type: int32 +[] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[] + -- child 1 type: int64 +[] + -- child 2 type: int64 +[] + -- child 3 type: int64 +[] + -- child 4 type: large_string +[] + -- child 5 type: large_string +[]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 39b056966d..69f452b458 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -22,6 +22,7 @@ from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType from pyiceberg.utils.concurrent import ExecutorFactory @@ -94,12 +95,13 @@ def snapshots(self) -> "pa.Table": schema=snapshots_schema, ) - def _get_entries_schema(self) -> "pa.Schema": + def _get_entries_schema(self, schema: Optional[Schema] = None) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow - schema = self.tbl.metadata.schema() + if not schema: + schema = self.tbl.metadata.schema() readable_metrics_struct = [] @@ -160,10 +162,10 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ) return entries_schema - def _get_entries(self, schema: "pa.Schema", manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + def _get_entries(self, schema: Schema, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": import pyarrow as pa - entries_schema = self._get_entries_schema() + entries_schema = self._get_entries_schema(schema=schema) entries = [] for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): column_sizes = entry.data_file.column_sizes or {} @@ -186,7 +188,7 @@ def _get_entries(self, schema: "pa.Schema", manifest: ManifestFile, discard_dele if (upper_bound := upper_bounds.get(field.field_id)) else None, } - for field in self.tbl.metadata.schema().fields + for field in schema.fields } partition = entry.data_file.partition @@ -208,7 +210,7 @@ def _get_entries(self, schema: "pa.Schema", manifest: ManifestFile, discard_dele "partition": partition_record_dict, "record_count": entry.data_file.record_count, "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes) or None, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, "null_value_counts": dict(entry.data_file.null_value_counts) if entry.data_file.null_value_counts is not None @@ -243,6 +245,8 @@ def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": raise ValueError(f"Cannot find schema_id for snapshot {snapshot.snapshot_id}") schema = self.tbl.schemas().get(snapshot.schema_id) + if not schema: + raise ValueError(f"Cannot find schema with ID {snapshot.schema_id}") for manifest in snapshot.manifests(self.tbl.io): manifest_entries = self._get_entries(schema=schema, manifest=manifest, discard_deleted=True) entries.append(manifest_entries) From a822a7660c2a91465780214b6ca6fb3f59d95065 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Thu, 15 May 2025 19:42:12 +0300 Subject: [PATCH 6/6] lint --- pyiceberg/table/inspect.py | 2 +- tests/integration/test_inspect_table.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 69f452b458..dc322dc6e3 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -757,4 +757,4 @@ def all_entries(self) -> "pa.Table": lambda manifest: self._get_entries(snapshot_schemas[manifest.added_snapshot_id], manifest, discard_deleted=True), unique_flattened_manifests, ) - return pa.concat_tables(entries) \ No newline at end of file + return pa.concat_tables(entries) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 317026acf5..e234715fc2 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1102,6 +1102,7 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog ) assert_frame_equal(lhs, rhs, check_dtype=False) + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: