Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Partitions Metadata Table #603

Merged
merged 6 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,47 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
```

### Partitions

Inspect the partitions of the table:

```python
table.inspect.partitions()
```

```
pyarrow.Table
partition: struct<dt_month: int32, dt_day: date32[day]> not null
child 0, dt_month: int32
child 1, dt_day: date32[day]
spec_id: int32 not null
record_count: int64 not null
file_count: int32 not null
total_data_file_size_in_bytes: int64 not null
position_delete_record_count: int64 not null
position_delete_file_count: int32 not null
equality_delete_record_count: int64 not null
equality_delete_file_count: int32 not null
last_updated_at: timestamp[ms]
last_updated_snapshot_id: int64
----
partition: [
-- is_valid: all not null
-- child 0 type: int32
[null,null,612]
-- child 1 type: date32[day]
[null,2021-02-01,null]]
spec_id: [[2,1,0]]
record_count: [[1,1,2]]
file_count: [[1,1,2]]
total_data_file_size_in_bytes: [[641,641,1260]]
position_delete_record_count: [[0,0,0]]
position_delete_file_count: [[0,0,0]]
equality_delete_record_count: [[0,0,0]]
equality_delete_file_count: [[0,0,0]]
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]
```

### Entries

To show all the table's current manifest entries for both data and delete files.
Expand Down
89 changes: 89 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
import daft
Expand Down Expand Up @@ -3422,6 +3423,94 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
schema=entries_schema,
)

def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

table_schema = pa.schema([
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_count', pa.int32(), nullable=False),
pa.field('total_data_file_size_in_bytes', pa.int64(), nullable=False),
pa.field('position_delete_record_count', pa.int64(), nullable=False),
pa.field('position_delete_file_count', pa.int32(), nullable=False),
pa.field('equality_delete_record_count', pa.int64(), nullable=False),
pa.field('equality_delete_file_count', pa.int32(), nullable=False),
pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=True),
pa.field('last_updated_snapshot_id', pa.int64(), nullable=True),
])

partition_record = self.tbl.metadata.specs_struct()
has_partitions = len(partition_record.fields) > 0

if has_partitions:
sungwy marked this conversation as resolved.
Show resolved Hide resolved
pa_record_struct = schema_to_pyarrow(partition_record)
partitions_schema = pa.schema([
pa.field('partition', pa_record_struct, nullable=False),
pa.field('spec_id', pa.int32(), nullable=False),
])

table_schema = pa.unify_schemas([partitions_schema, table_schema])

def update_partitions_map(
partitions_map: Dict[Tuple[str, Any], Any],
file: DataFile,
partition_record_dict: Dict[str, Any],
snapshot: Optional[Snapshot],
) -> None:
partition_record_key = _convert_to_hashable_type(partition_record_dict)
if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = {
"partition": partition_record_dict,
"spec_id": file.spec_id,
"record_count": 0,
"file_count": 0,
"total_data_file_size_in_bytes": 0,
"position_delete_record_count": 0,
"position_delete_file_count": 0,
"equality_delete_record_count": 0,
"equality_delete_file_count": 0,
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
}

partition_row = partitions_map[partition_record_key]

if snapshot is not None:
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
partition_row["last_updated_at"] = snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id

if file.content == DataFileContent.DATA:
partition_row["record_count"] += file.record_count
partition_row["file_count"] += 1
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
elif file.content == DataFileContent.POSITION_DELETES:
partition_row["position_delete_record_count"] += file.record_count
partition_row["position_delete_file_count"] += 1
elif file.content == DataFileContent.EQUALITY_DELETES:
partition_row["equality_delete_record_count"] += file.record_count
partition_row["equality_delete_file_count"] += 1
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")

partitions_map: Dict[Tuple[str, Any], Any] = {}
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)

return pa.Table.from_pylist(
partitions_map.values(),
schema=table_schema,
)


@dataclass(frozen=True)
class TablePartition:
Expand Down
120 changes: 120 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,123 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal

assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': date(2021, 2, 1), 'dt_month': None}
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612}


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_partitions_unpartitioned(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_metadata_partitions_unpartitioned"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

# Write some data through multiple commits
tbl.append(arrow_table_with_null)
tbl.append(arrow_table_with_null)

df = tbl.inspect.partitions()
assert df.column_names == [
'record_count',
'file_count',
'total_data_file_size_in_bytes',
'position_delete_record_count',
'position_delete_file_count',
'equality_delete_record_count',
'equality_delete_file_count',
'last_updated_at',
'last_updated_snapshot_id',
]
for last_updated_at in df['last_updated_at']:
assert isinstance(last_updated_at.as_py(), datetime)

int_cols = [
'record_count',
'file_count',
'total_data_file_size_in_bytes',
'position_delete_record_count',
'position_delete_file_count',
'equality_delete_record_count',
'equality_delete_file_count',
'last_updated_snapshot_id',
]
for column in int_cols:
for value in df[column]:
assert isinstance(value.as_py(), int)
lhs = df.to_pandas()
rhs = spark.table(f"{identifier}.partitions").toPandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
assert left == right, f"Difference in column {column}: {left} != {right}"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_metadata_partitions_partitioned"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
name string,
dt date
)
PARTITIONED BY (months(dt))
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date))
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date))
"""
)

spark.sql(
f"""
ALTER TABLE {identifier}
REPLACE PARTITION FIELD dt_month WITH days(dt)
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date))
"""
)

spark.sql(
f"""
ALTER TABLE {identifier}
DROP PARTITION FIELD dt_day
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date))
"""
)

def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None:
lhs = df.to_pandas().sort_values('spec_id')
rhs = spark_df.toPandas().sort_values('spec_id')
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if column == "partition":
right = right.asDict()
assert left == right, f"Difference in column {column}: {left} != {right}"

tbl = session_catalog.load_table(identifier)
for snapshot in tbl.metadata.snapshots:
df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}")
check_pyiceberg_df_equals_spark_df(df, spark_df)