Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Files from Parquet Files to UnPartitioned Table #506

Merged
merged 12 commits into from
Mar 16, 2024
36 changes: 33 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -1599,6 +1599,7 @@ def fill_parquet_file_metadata(
parquet_metadata: pq.FileMetaData,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
check_schema_parity: bool = True,
) -> None:
"""
Compute and fill the following fields of the DataFile object.
Expand All @@ -1618,12 +1619,12 @@ def fill_parquet_file_metadata(
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
"""
if parquet_metadata.num_columns != len(stats_columns):
if check_schema_parity and parquet_metadata.num_columns != len(stats_columns):
raise ValueError(
f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
)

if parquet_metadata.num_columns != len(parquet_column_mapping):
if check_schema_parity and parquet_metadata.num_columns != len(parquet_column_mapping):
raise ValueError(
f"Number of columns in column mapping ({len(parquet_column_mapping)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
)
Expand Down Expand Up @@ -1772,6 +1773,35 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
return iter([data_file])


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
for task in tasks:
input_file = io.new_input(task.file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

schema = table_metadata.schema()
data_file = DataFile(
content=DataFileContent.DATA,
file_path=task.file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_field_value,
record_count=parquet_metadata.num_rows,
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
check_schema_parity=False,
)
yield data_file


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"

Expand Down
64 changes: 64 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Expand All @@ -56,6 +57,7 @@
EqualTo,
Reference,
)
from pyiceberg.expressions.literals import StringLiteral
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
Expand Down Expand Up @@ -115,6 +117,7 @@
Identifier,
KeyDefaultDict,
Properties,
Record,
)
from pyiceberg.types import (
IcebergType,
Expand Down Expand Up @@ -1147,6 +1150,26 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def add_files(self, file_paths: List[str]) -> None:
"""
Shorthand API for adding files as data files to the table.

Args:
file_paths: The list of full file paths to be added as data files to the table
"""
sungwy marked this conversation as resolved.
Show resolved Hide resolved
if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields):
raise NotImplementedError("Cannot add_files to a table with Transform Partitions")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be more permissive. It isn't a problem the table's current partitioning has something different than a IdentitiyTransform, the issue is that we cannot add DataFiles that use this partitioning (until we find a clever way of checking this).


if self.name_mapping() is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you don't have to add a name-mapping if the field-IDs are set

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Yeah I think you are right!

When field IDs are in the files, and the name_mapping is also present, the field_ids take precedence over the name_mapping in schema resolution. So the name_mapping here would essentially be meaningless in that case.

I'm on the fence between moving forward with your suggestion (create name_mapping if there are no field_ids) or whether we should always assert that the parquet files that we want to add have no field IDs. And that's because the field_ids that we actually use in our Iceberg generated parquet files, is the Iceberg Table's internal notion of field IDs. Whenever a new table gets created, new field IDs are assigned, and Iceberg keeps track of these field IDs internally to ensure that the same field can be treated the same through column renaming.

When we add_files, we are introducing files that have been produced by an external process to Iceberg, which isn't aware of Iceberg's internal fields metadata. In that sense, I feel that allowing files that have field_ids to be added could result in unexpected errors for the user that are difficult to diagnose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that the Parquet file and the mapping don't match. For example, there are more fields in the parquet file than in the mapping. I think it is good to add checks there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this check here @Fokko let me know if that makes sense to you

with self.transaction() as tx:
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})

with self.transaction() as txn:
with txn.update_snapshot().fast_append() as update_snapshot:
sungwy marked this conversation as resolved.
Show resolved Hide resolved
data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -2444,6 +2467,12 @@ def generate_data_file_filename(self, extension: str) -> str:
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"


@dataclass(frozen=True)
class AddFileTask:
file_path: str
partition_field_value: Record


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f'{location}/metadata/{commit_uuid}-m{num}.avro'

Expand Down Expand Up @@ -2475,6 +2504,41 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))


def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
partition_spec = table_metadata.spec()
partition_struct = partition_spec.partition_type(table_metadata.schema())

for file_path in file_paths:
# file_path = 's3://warehouse/default/part1=2024-03-04/part2=ABCD'
# ['part1=2024-03-04', 'part2=ABCD']
parts = [part for part in file_path.split("/") if "=" in part]

partition_field_values = {}
for part in parts:
partition_name, string_value = part.split("=")
if partition_field := partition_struct.field_by_name(partition_name):
partition_field_values[partition_name] = StringLiteral(string_value).to(partition_field.field_type).value

yield AddFileTask(
file_path=file_path,
partition_field_value=Record(**{
field.name: partition_field_values.get(field.name) for field in partition_struct.fields
}),
)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Returns:
An iterable that supplies DataFiles that describe the parquet files.
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files

tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
commit_uuid: uuid.UUID
_operation: Operation
Expand Down
Loading