diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 9bdb6dcdaa..cf4276edec 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -382,6 +382,47 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap- summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]] ``` +### Partitions + +Inspect the partitions of the table: + +```python +table.inspect.partitions() +``` + +``` +pyarrow.Table +partition: struct not null + child 0, dt_month: int32 + child 1, dt_day: date32[day] +spec_id: int32 not null +record_count: int64 not null +file_count: int32 not null +total_data_file_size_in_bytes: int64 not null +position_delete_record_count: int64 not null +position_delete_file_count: int32 not null +equality_delete_record_count: int64 not null +equality_delete_file_count: int32 not null +last_updated_at: timestamp[ms] +last_updated_snapshot_id: int64 +---- +partition: [ + -- is_valid: all not null + -- child 0 type: int32 +[null,null,612] + -- child 1 type: date32[day] +[null,2021-02-01,null]] +spec_id: [[2,1,0]] +record_count: [[1,1,2]] +file_count: [[1,1,2]] +total_data_file_size_in_bytes: [[641,641,1260]] +position_delete_record_count: [[0,0,0]] +position_delete_file_count: [[0,0,0]] +equality_delete_record_count: [[0,0,0]] +equality_delete_file_count: [[0,0,0]] +last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]] +``` + ### Entries To show all the table's current manifest entries for both data and delete files. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index da4b1465be..95fdb1d288 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -137,6 +137,7 @@ ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.datetime import datetime_to_millis +from pyiceberg.utils.singleton import _convert_to_hashable_type if TYPE_CHECKING: import daft @@ -3422,6 +3423,94 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: schema=entries_schema, ) + def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + table_schema = pa.schema([ + pa.field('record_count', pa.int64(), nullable=False), + pa.field('file_count', pa.int32(), nullable=False), + pa.field('total_data_file_size_in_bytes', pa.int64(), nullable=False), + pa.field('position_delete_record_count', pa.int64(), nullable=False), + pa.field('position_delete_file_count', pa.int32(), nullable=False), + pa.field('equality_delete_record_count', pa.int64(), nullable=False), + pa.field('equality_delete_file_count', pa.int32(), nullable=False), + pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=True), + pa.field('last_updated_snapshot_id', pa.int64(), nullable=True), + ]) + + partition_record = self.tbl.metadata.specs_struct() + has_partitions = len(partition_record.fields) > 0 + + if has_partitions: + pa_record_struct = schema_to_pyarrow(partition_record) + partitions_schema = pa.schema([ + pa.field('partition', pa_record_struct, nullable=False), + pa.field('spec_id', pa.int32(), nullable=False), + ]) + + table_schema = pa.unify_schemas([partitions_schema, table_schema]) + + def update_partitions_map( + partitions_map: Dict[Tuple[str, Any], Any], + file: DataFile, + partition_record_dict: Dict[str, Any], + snapshot: Optional[Snapshot], + ) -> None: + partition_record_key = _convert_to_hashable_type(partition_record_dict) + if partition_record_key not in partitions_map: + partitions_map[partition_record_key] = { + "partition": partition_record_dict, + "spec_id": file.spec_id, + "record_count": 0, + "file_count": 0, + "total_data_file_size_in_bytes": 0, + "position_delete_record_count": 0, + "position_delete_file_count": 0, + "equality_delete_record_count": 0, + "equality_delete_file_count": 0, + "last_updated_at": snapshot.timestamp_ms if snapshot else None, + "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None, + } + + partition_row = partitions_map[partition_record_key] + + if snapshot is not None: + if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms: + partition_row["last_updated_at"] = snapshot.timestamp_ms + partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id + + if file.content == DataFileContent.DATA: + partition_row["record_count"] += file.record_count + partition_row["file_count"] += 1 + partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes + elif file.content == DataFileContent.POSITION_DELETES: + partition_row["position_delete_record_count"] += file.record_count + partition_row["position_delete_file_count"] += 1 + elif file.content == DataFileContent.EQUALITY_DELETES: + partition_row["equality_delete_record_count"] += file.record_count + partition_row["equality_delete_file_count"] += 1 + else: + raise ValueError(f"Unknown DataFileContent ({file.content})") + + partitions_map: Dict[Tuple[str, Any], Any] = {} + snapshot = self._get_snapshot(snapshot_id) + for manifest in snapshot.manifests(self.tbl.io): + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None + update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) + + return pa.Table.from_pylist( + partitions_map.values(), + schema=table_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 7cbfc6da08..d9ec563466 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -270,3 +270,123 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': date(2021, 2, 1), 'dt_month': None} assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_partitions_unpartitioned( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_partitions_unpartitioned" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Write some data through multiple commits + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + df = tbl.inspect.partitions() + assert df.column_names == [ + 'record_count', + 'file_count', + 'total_data_file_size_in_bytes', + 'position_delete_record_count', + 'position_delete_file_count', + 'equality_delete_record_count', + 'equality_delete_file_count', + 'last_updated_at', + 'last_updated_snapshot_id', + ] + for last_updated_at in df['last_updated_at']: + assert isinstance(last_updated_at.as_py(), datetime) + + int_cols = [ + 'record_count', + 'file_count', + 'total_data_file_size_in_bytes', + 'position_delete_record_count', + 'position_delete_file_count', + 'equality_delete_record_count', + 'equality_delete_file_count', + 'last_updated_snapshot_id', + ] + for column in int_cols: + for value in df[column]: + assert isinstance(value.as_py(), int) + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.partitions").toPandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_partitions_partitioned" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + name string, + dt date + ) + PARTITIONED BY (months(dt)) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date)) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date)) + """ + ) + + spark.sql( + f""" + ALTER TABLE {identifier} + REPLACE PARTITION FIELD dt_month WITH days(dt) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date)) + """ + ) + + spark.sql( + f""" + ALTER TABLE {identifier} + DROP PARTITION FIELD dt_day + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date)) + """ + ) + + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + lhs = df.to_pandas().sort_values('spec_id') + rhs = spark_df.toPandas().sort_values('spec_id') + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == "partition": + right = right.asDict() + assert left == right, f"Difference in column {column}: {left} != {right}" + + tbl = session_catalog.load_table(identifier) + for snapshot in tbl.metadata.snapshots: + df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) + spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") + check_pyiceberg_df_equals_spark_df(df, spark_df)