Skip to content

support all_entries in pyiceberg #1608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

amitgilad3
Copy link
Contributor

@amitgilad3 amitgilad3 commented Feb 4, 2025

Implements all_entries metadata table - #1053
This is my initial pr towards supporting all_entries in pyiceberg , i'm currently running into an issue where the number of entries returned by spark is less then what pyiceberg returns and i'm not sure why

pyiceberg results:
pyiceberg

spark results:
spark

Im also attaching the csv for investigation puporses
py_iceberg vs spark.csv

*Update - issue resolved

@amitgilad3 amitgilad3 marked this pull request as draft February 4, 2025 17:11
@kevinjqliu
Copy link
Contributor

the number of entries returned by spark is less then what pyiceberg returns and i'm not sure why

looks like there are a lot of dups in pyiceberg. if you do set and sort, it should match

@amitgilad3
Copy link
Contributor Author

amitgilad3 commented Feb 4, 2025

Thnx @kevinjqliu - wasn't sure how the iceberg java library does it :)

@amitgilad3 amitgilad3 marked this pull request as ready for review February 5, 2025 05:54
@amitgilad3
Copy link
Contributor Author

Ready for review

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I added some comments

Comment on lines 989 to 990
for snapshot_id in df["snapshot_id"]:
assert isinstance(snapshot_id.as_py(), int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is redundant, right? already included in the above

"readable_metrics",
]

# Make sure that they are filled properly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment to signal that this checks the first 4 columns. and the rest of the tests check for the last 2 data_file and readable_metrics

Comment on lines 993 to 997
lhs["content"] = lhs["data_file"].apply(lambda x: x.get("content"))
lhs["file_path"] = lhs["data_file"].apply(lambda x: x.get("file_path"))
lhs = lhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop(
columns=["file_path", "content"]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe inline these operation to show that the same operations are performed on lhs and rhs

"partition": partition_record_dict,
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

"column_sizes": dict(entry.data_file.column_sizes) or None

def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table":
import pyarrow as pa

schema = self.tbl.metadata.schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something im concerned about with the all_* metadata tables is taking into account the table schema/partition evolution.

since we're looking across all snapshots, it might not be right to use the current snapshot here

@amitgilad3
Copy link
Contributor Author

Ready for review

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. I've added some more comments about using the table's current metadata

@@ -716,6 +716,8 @@ readable_metrics: [
[6.0989]]
```

To show all the table's current manifest entries for both data and delete files, use `table.inspect.all_entries()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd be great to add this to its own section, similar to https://iceberg.apache.org/docs/nightly/spark-queries/#all-metadata-tables

Comment on lines +212 to +220
"value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None,
"null_value_counts": dict(entry.data_file.null_value_counts)
if entry.data_file.null_value_counts is not None
else None,
"nan_value_counts": dict(entry.data_file.nan_value_counts)
if entry.data_file.nan_value_counts is not None
else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use the same dict() or None pattern here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i used the same logic you used in lines L645 - L650 , if i use dict() or None then the code breaks when trying todo dict(None)

@@ -157,74 +158,96 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
]
)
return entries_schema

def _get_entries(self, schema: "pa.Schema", manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema here should be iceberg schema instead of pa.schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are still a couple places in this function using self.tbl. which is the table's current metadata
L189 & L195.
We should use the schema and partition spec at the time of the snapshot instead

@@ -94,7 +94,7 @@ def snapshots(self) -> "pa.Table":
schema=snapshots_schema,
)

def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def _get_entries_schema(self) -> "pa.Schema":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same problem here using the table's current metadata (self.tbl.)

@soumya-ghosh
Copy link
Contributor

@amitgilad3 @Fokko @kevinjqliu Let's get this PR to completion to complete remaining metadata tables.

@amitgilad3
Copy link
Contributor Author

fixed merge conflict so i think we are good to go @soumya-ghosh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants