Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Files from Parquet Files to UnPartitioned Table #506

Merged
merged 12 commits into from
Mar 16, 2024
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ test-integration:
docker-compose -f dev/docker-compose-integration.yml up -d
sleep 10
docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was committed by accident?

Suggested change
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always do 😅


test-integration-rebuild:
docker-compose -f dev/docker-compose-integration.yml kill
Expand Down
35 changes: 34 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
return iter([data_file])


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
for task in tasks:
input_file = io.new_input(task.file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
raise NotImplementedError(
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
data_file = DataFile(
content=DataFileContent.DATA,
file_path=task.file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_field_value,
record_count=parquet_metadata.num_rows,
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
yield data_file


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"

Expand Down
49 changes: 49 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Expand Down Expand Up @@ -115,6 +116,7 @@
Identifier,
KeyDefaultDict,
Properties,
Record,
)
from pyiceberg.types import (
IcebergType,
Expand Down Expand Up @@ -1147,6 +1149,24 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def add_files(self, file_paths: List[str]) -> None:
"""
Shorthand API for adding files as data files to the table.

Args:
file_paths: The list of full file paths to be added as data files to the table

Raises:
FileNotFoundError: If the file does not exist.
"""
with self.transaction() as tx:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
if self.name_mapping() is None:
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
with tx.update_snapshot().fast_append() as update_snapshot:
data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -2444,6 +2464,12 @@ def generate_data_file_filename(self, extension: str) -> str:
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"


@dataclass(frozen=True)
class AddFileTask:
file_path: str
partition_field_value: Record


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f'{location}/metadata/{commit_uuid}-m{num}.avro'

Expand Down Expand Up @@ -2475,6 +2501,29 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))


def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
raise ValueError("Cannot add files to partitioned tables")

for file_path in file_paths:
yield AddFileTask(
file_path=file_path,
partition_field_value=Record(),
)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Returns:
An iterable that supplies DataFiles that describe the parquet files.
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files

tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
commit_uuid: uuid.UUID
_operation: Operation
Expand Down
240 changes: 240 additions & 0 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name

from datetime import date
from typing import Optional

import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from pyspark.sql import SparkSession

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.types import (
BooleanType,
DateType,
IntegerType,
NestedField,
StringType,
)

TABLE_SCHEMA = Schema(
NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="bar", field_type=StringType(), required=False),
NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False),
NestedField(field_id=10, name="qux", field_type=DateType(), required=False),
)

ARROW_SCHEMA = pa.schema([
("foo", pa.bool_()),
("bar", pa.string()),
("baz", pa.int32()),
("qux", pa.date32()),
])

ARROW_TABLE = pa.Table.from_pylist(
[
{
"foo": True,
"bar": "bar_string",
"baz": 123,
"qux": date(2024, 3, 7),
}
],
schema=ARROW_SCHEMA,
)

ARROW_SCHEMA_WITH_IDS = pa.schema([
pa.field('foo', pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field('bar', pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field('baz', pa.int32(), nullable=False, metadata={"PARQUET:field_id": "3"}),
pa.field('qux', pa.date32(), nullable=False, metadata={"PARQUET:field_id": "4"}),
])


ARROW_TABLE_WITH_IDS = pa.Table.from_pylist(
[
{
"foo": True,
"bar": "bar_string",
"baz": 123,
"qux": date(2024, 3, 7),
}
],
schema=ARROW_SCHEMA_WITH_IDS,
)

ARROW_SCHEMA_UPDATED = pa.schema([
("foo", pa.bool_()),
("baz", pa.int32()),
("qux", pa.date32()),
("quux", pa.int32()),
])

ARROW_TABLE_UPDATED = pa.Table.from_pylist(
[
{
"foo": True,
"baz": 123,
"qux": date(2024, 3, 7),
"quux": 234,
}
],
schema=ARROW_SCHEMA_UPDATED,
)


def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(
identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec()
)

return tbl


@pytest.mark.integration
def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_table"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
writer.write_table(ARROW_TABLE)

# add the parquet files as data files
tbl.add_files(file_paths=file_paths)

# NameMapping must have been set to enable reads
assert tbl.name_mapping() is not None

rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()

assert [row.added_data_files_count for row in rows] == [5]
assert [row.existing_data_files_count for row in rows] == [0]
assert [row.deleted_data_files_count for row in rows] == [0]

df = spark.table(identifier)
assert df.count() == 5, "Expected 5 rows"
for col in df.columns:
assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null"


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_raises_not_found"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
writer.write_table(ARROW_TABLE)

# add the parquet files as data files
with pytest.raises(FileNotFoundError):
tbl.add_files(file_paths=file_paths + ["s3://warehouse/default/unpartitioned_raises_not_found/unknown.parquet"])


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_raises_field_ids"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
writer.write_table(ARROW_TABLE_WITH_IDS)

# add the parquet files as data files
with pytest.raises(NotImplementedError):
tbl.add_files(file_paths=file_paths)


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_table_2"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
writer.write_table(ARROW_TABLE)

# add the parquet files as data files
tbl.add_files(file_paths=file_paths)

# NameMapping must have been set to enable reads
assert tbl.name_mapping() is not None

with tbl.update_schema() as update:
update.add_column("quux", IntegerType())
update.delete_column("bar")

file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet"
# write parquet files
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_UPDATED) as writer:
writer.write_table(ARROW_TABLE_UPDATED)

# add the parquet files as data files
tbl.add_files(file_paths=[file_path])
rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()

assert [row.added_data_files_count for row in rows] == [5, 1, 5]
assert [row.existing_data_files_count for row in rows] == [0, 0, 0]
assert [row.deleted_data_files_count for row in rows] == [0, 0, 0]

df = spark.table(identifier)
assert df.count() == 6, "Expected 6 rows"
assert len(df.columns) == 4, "Expected 4 columns"
df.show()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was for testing, can we remove this one? .show() is a spark action, meaning it will run the pipeline.

for col in df.columns:
value_count = 1 if col == "quux" else 6
assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"