Skip to content

Add all filles metadata tables #1626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 107 additions & 60 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
Expand Down Expand Up @@ -523,7 +523,73 @@ def history(self) -> "pa.Table":

return pa.Table.from_pylist(history, schema=history_schema)

def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
def _get_files_from_manifest(
self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None
) -> "pa.Table":
import pyarrow as pa

files: list[dict[str, Any]] = []
schema = self.tbl.metadata.schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when time traveling with different snapshots, we shouldnt just use the current table schema
for context #1053 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu updated code as per comments.

io = self.tbl.io

for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
if data_file_filter and data_file.content not in data_file_filter:
continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
nan_value_counts = data_file.nan_value_counts or {}
lower_bounds = data_file.lower_bounds or {}
upper_bounds = data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}
partition = data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest_list.partition_spec_id].fields)
}
files.append(
{
"content": data_file.content,
"file_path": data_file.file_path,
"file_format": data_file.file_format,
"spec_id": data_file.spec_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark we also have the partition column, I think it would be good to add that one here as well:

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Added partition column in files metadata table schema and added a test for the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On minor point, could we swap the order of spec_id and partition to keep it the same as in Spark:

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order of spec_id and partition column fixed.

"partition": partition_record_dict,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
"sort_order_id": data_file.sort_order_id,
"readable_metrics": readable_metrics,
}
)
return pa.Table.from_pylist(
files,
schema=self._get_files_schema(),
)

def _get_files_schema(self) -> "pa.Schema":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand All @@ -544,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
]
)

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
Expand All @@ -555,6 +624,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("file_path", pa.string(), nullable=False),
pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("spec_id", pa.int32(), nullable=False),
pa.field("partition", pa_record_struct, nullable=False),
pa.field("record_count", pa.int64(), nullable=False),
pa.field("file_size_in_bytes", pa.int64(), nullable=False),
pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True),
Expand All @@ -570,71 +640,21 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
]
)
return files_schema

files: list[dict[str, Any]] = []
def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

if not snapshot_id and not self.tbl.metadata.current_snapshot():
return pa.Table.from_pylist(
files,
schema=files_schema,
)
snapshot = self._get_snapshot(snapshot_id)
return self._get_files_schema().empty_table()

snapshot = self._get_snapshot(snapshot_id)
io = self.tbl.io
files_table: list[pa.Table] = []
for manifest_list in snapshot.manifests(io):
for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
if data_file_filter and data_file.content not in data_file_filter:
continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
nan_value_counts = data_file.nan_value_counts or {}
lower_bounds = data_file.lower_bounds or {}
upper_bounds = data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}
files.append(
{
"content": data_file.content,
"file_path": data_file.file_path,
"file_format": data_file.file_format,
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
"null_value_counts": dict(data_file.null_value_counts)
if data_file.null_value_counts is not None
else None,
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
"sort_order_id": data_file.sort_order_id,
"readable_metrics": readable_metrics,
}
)
files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter))

return pa.Table.from_pylist(
files,
schema=files_schema,
)
return pa.concat_tables(files_table)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id)
Expand All @@ -657,3 +677,30 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
if not snapshots:
return pa.Table.from_pylist([], schema=self._get_files_schema())

executor = ExecutorFactory.get_or_create()
manifest_lists = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots)

unique_manifests = {(manifest.manifest_path, manifest) for manifest_list in manifest_lists for manifest in manifest_list}

file_lists = executor.map(
lambda args: self._get_files_from_manifest(*args), [(manifest, data_file_filter) for _, manifest in unique_manifests]
)

return pa.concat_tables(file_lists)

def all_files(self) -> "pa.Table":
return self._all_files()

def all_data_files(self) -> "pa.Table":
return self._all_files({DataFileContent.DATA})

def all_delete_files(self) -> "pa.Table":
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also include Puffin files:

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a Spark table to test this:

for format_version in [2, 3]:
identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}'
spark.sql(
f"""
CREATE OR REPLACE TABLE {identifier} (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='{format_version}'
);
"""
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, will check this. If this requires changes, it will also need changes in files and delete_files table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Added an integration test for table with format version 3, used Spark to write through pyiceberg to V3 table were failing.

Note that, the outputs of files metadata (and all other related tables) do not completely match with Spark counterparts due to additional columns in like first_row_id, referenced_data_file, content_offset, content_size_in_bytes. This needs to added first in DataFile class then propagated as required. Should be addressed in different issue, will it part of V3 tracking issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that in a separate PR: #1982

Loading