Skip to content

Add all filles metadata tables #1626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 13, 2025

Conversation

soumya-ghosh
Copy link
Contributor

@soumya-ghosh soumya-ghosh commented Feb 8, 2025

Implements below metadata table from - #1053

  • all_files
  • all_data_files
  • all_delete_files

Refactored code for files metadata for better reusability

- all_files
- all_data_files
- all_delete_files
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I added a few comments

Comment on lines 681 to 694
all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map(
lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots]
)
all_manifest_files = list(
{(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list}
)
all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map(
lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files]
)
all_files_list = [file for files in all_files_by_manifest for file in files]
return pa.Table.from_pylist(
all_files_list,
schema=self._get_files_schema(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about something like this?

Also i would rename _files_by_manifest and have it return pa.Table, so we can skip the flatten and just concat the tables.

Suggested change
all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map(
lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots]
)
all_manifest_files = list(
{(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list}
)
all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map(
lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files]
)
all_files_list = [file for files in all_files_by_manifest for file in files]
return pa.Table.from_pylist(
all_files_list,
schema=self._get_files_schema(),
)
manifest_lists = executor.map(
lambda snapshot: snapshot.manifests(self.tbl.io),
snapshots
)
unique_manifests = {
(manifest.manifest_path, manifest)
for manifest_list in manifest_lists
for manifest in manifest_list
}
file_lists = executor.map(
self._files_by_manifest,
[(manifest, data_file_filter) for _, manifest in unique_manifests]
)
all_files = [
file
for file_list in file_lists
for file in file_list
]
return pa.Table.from_pylist(
all_files,
schema=self._get_files_schema()
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this, the impl of the _files_by_manifest enforces uniqueness which wasn't clear

self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None
) -> List[Dict[str, Any]]:
files: list[dict[str, Any]] = []
schema = self.tbl.metadata.schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when time traveling with different snapshots, we shouldnt just use the current table schema
for context #1053 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu updated code as per comments.

@Fokko
Copy link
Contributor

Fokko commented May 6, 2025

@soumya-ghosh Gentle ping, would you be interested in contributing this? Would be great to get this in 🚀

Comment on lines 641 to 644
return pa.Table.from_pylist(
files,
schema=files_schema,
[],
schema=self._get_files_schema(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one, this can be further simplified to:

return self._get_files_schema().empty_table()

Less is more :)

def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

files_table: list[pa.Table] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can move this one down, we don't need to create the error when we return on line 642

@Fokko
Copy link
Contributor

Fokko commented May 6, 2025

@soumya-ghosh I see that you incorporated the feedback by @kevinjqliu directly, instead of accepting the suggestion. That also works, thanks for working on this. I think we're pretty close 👍

@soumya-ghosh
Copy link
Contributor Author

Yes @Fokko, there is an open discussion that was happening in #1053 (comment).

I will raise another PR for docs about the inspect operations.

return self._all_files({DataFileContent.DATA})

def all_delete_files(self) -> "pa.Table":
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also include Puffin files:

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a Spark table to test this:

for format_version in [2, 3]:
identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}'
spark.sql(
f"""
CREATE OR REPLACE TABLE {identifier} (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='{format_version}'
);
"""
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, will check this. If this requires changes, it will also need changes in files and delete_files table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Added an integration test for table with format version 3, used Spark to write through pyiceberg to V3 table were failing.

Note that, the outputs of files metadata (and all other related tables) do not completely match with Spark counterparts due to additional columns in like first_row_id, referenced_data_file, content_offset, content_size_in_bytes. This needs to added first in DataFile class then propagated as required. Should be addressed in different issue, will it part of V3 tracking issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that in a separate PR: #1982

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left one minor comment for partition, apart from that, this looks great to me. Thanks @soumya-ghosh for working on this 🙌

"content": data_file.content,
"file_path": data_file.file_path,
"file_format": data_file.file_format,
"spec_id": data_file.spec_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark we also have the partition column, I think it would be good to add that one here as well:

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Added partition column in files metadata table schema and added a test for the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On minor point, could we swap the order of spec_id and partition to keep it the same as in Spark:

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order of spec_id and partition column fixed.

return self._all_files({DataFileContent.DATA})

def all_delete_files(self) -> "pa.Table":
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that in a separate PR: #1982

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor remark, apart from that it looks good.

Pinging @geruh @kevinjqliu to see if they have any further comments

"content": data_file.content,
"file_path": data_file.file_path,
"file_format": data_file.file_format,
"spec_id": data_file.spec_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On minor point, could we swap the order of spec_id and partition to keep it the same as in Spark:

image

@Fokko Fokko requested a review from kevinjqliu May 10, 2025 20:57
@Fokko
Copy link
Contributor

Fokko commented May 13, 2025

Let's merge this to unblock #1958. Thanks @soumya-ghosh for working on this, and thanks @kevinjqliu and @geruh for the reviews 🙌

@Fokko Fokko merged commit 2a54034 into apache:main May 13, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants