Skip to content

Write support #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
7133054
WIP: Write
Fokko Oct 3, 2023
ffecf72
Add logic to generate a new snapshot-id
Fokko Oct 3, 2023
25eb597
WIP
Fokko Oct 4, 2023
4cd493e
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 5, 2023
a726b1d
Construct a writer tree
Fokko Oct 4, 2023
b88f736
WIP
Fokko Oct 5, 2023
f53626d
WIP
Fokko Oct 6, 2023
0c665ef
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 9, 2023
3f79dbd
Fix linting
Fokko Oct 9, 2023
02430bb
Make the tests pass
Fokko Oct 9, 2023
eb4dd62
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 10, 2023
c891382
Add support for V2
Fokko Oct 10, 2023
aae5a57
pre-commit
Fokko Oct 10, 2023
cff3a1d
Move things outside of pyarrow.py
Fokko Oct 10, 2023
082387e
Append WIP
Fokko Oct 11, 2023
997b673
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 11, 2023
9d52906
WIP
Fokko Oct 11, 2023
8893cf3
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 11, 2023
55f27c9
Add v1 to v2 promotion tests
Fokko Oct 11, 2023
9a0096b
Add _MergeSnapshots
Fokko Oct 11, 2023
4f5b710
Work on the Summary
Fokko Oct 11, 2023
926d947
Add tests
Fokko Oct 12, 2023
50575a8
Add Snapshot logic and Summary generation
Fokko Oct 12, 2023
5482ae0
WIP
Fokko Oct 13, 2023
2fa01f4
WIP
Fokko Oct 13, 2023
580c824
Cleanup
Fokko Oct 13, 2023
254d7e8
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 15, 2023
f4ae6c5
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 16, 2023
760c0d4
Merge branch 'main' of github.com:apache/iceberg-python into fd-snaps…
Fokko Oct 23, 2023
3dba41a
Refactor it a bit
Fokko Oct 23, 2023
bcc5176
Cleanup
Fokko Oct 23, 2023
6d5fbb1
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 23, 2023
3309129
Merge branch 'main' of github.com:apache/iceberg-python into fd-snaps…
Fokko Oct 25, 2023
12c4699
Comments
Fokko Oct 25, 2023
8ef1a06
Merge branch 'fd-snapshots' of github.com:Fokko/iceberg-python into f…
Fokko Oct 25, 2023
aabfb09
Cleanup
Fokko Oct 25, 2023
149c3ec
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 9, 2023
17fd689
WIP
Fokko Dec 9, 2023
54e36ab
Update poetry
Fokko Dec 9, 2023
ab36ec3
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 12, 2023
d6df342
Cleanup
Fokko Dec 12, 2023
1398a2f
Update error
Fokko Dec 12, 2023
c426068
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 18, 2023
1861647
WIP
Fokko Dec 18, 2023
cebc781
Make the CI happy
Fokko Dec 18, 2023
4d0d11c
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 18, 2023
abda552
Cleanup
Fokko Dec 18, 2023
3cd5829
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 19, 2023
5f86b15
fix
Fokko Dec 19, 2023
5044da6
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 20, 2023
e020efb
Fix test
Fokko Dec 20, 2023
0b42471
Merge branch 'main' into fd-write
Fokko Dec 21, 2023
a41abd0
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 26, 2023
286cf47
Thanks Amogh!
Fokko Dec 26, 2023
559618c
Merge branch 'fd-write' of github.com:Fokko/iceberg-python into fd-write
Fokko Dec 26, 2023
4153e78
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 3, 2024
54e75d6
Make the CI happy
Fokko Jan 3, 2024
158077c
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 11, 2024
bbc0b35
Update lint
Fokko Jan 11, 2024
d441af9
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 12, 2024
e395a8f
Update pyiceberg/table/snapshots.py
Fokko Jan 12, 2024
2a65357
Comments and fixing some bugs
Fokko Jan 15, 2024
a013f35
Merge branch 'fd-write' of github.com:Fokko/iceberg-python into fd-write
Fokko Jan 15, 2024
abc0741
Remove doc
Fokko Jan 15, 2024
b817a15
Fix the tests
Fokko Jan 15, 2024
48ba852
Refactor
Fokko Jan 15, 2024
664e113
Move to fast-appends
Fokko Jan 16, 2024
85ac0eb
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 17, 2024
7e8c04f
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 17, 2024
7baf3ec
keep track of deleted files
Fokko Jan 17, 2024
ab020b9
Comments
Fokko Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(

The static-table is considered read-only.

## Write support

With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this example! Made it really easy to test out.

The example works great cut & pasted into a REPL. I also tested modifications to the dataframe schema passed to append and it does the right thing. I get a schema error for a few cases:

  • Missing column long
  • Type mismatch string instead of double
  • Extra column country

Looks like Arrow requires that the schema matches, which is great.

It would be nice to allow some type promotion in the future. I'm not sure whether arrow would automatically write floats into double columns, for example. I would also like to make sure we have better error messages, not just "ValueError: Table schema does not match schema used to create file: ...". Those will be good follow ups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this ties into the work that @syun64 is doing where we have to make sure that we map the fields correctly, and then I think we can add options to massage the Arrow schema into the Iceberg one (which should be leading).

We can create a visitorWithPartner that will see if the promotions are possible. One that comes to my mind directly, is checking if there are any nulls. Arrow marks the schemas as nullable by default, while there are no nulls.


```python
import pyarrow as pa

df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
```

Next, create a table based on the schema:

```python
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't required=False the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the default is the more strict True. I've set it to False because PyArrow produces nullable fields by default

)

tbl = catalog.create_table("default.cities", schema=schema)
```

Now write the data to the table:

<!-- prettier-ignore-start -->

!!! note inline end "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads.

<!-- prettier-ignore-end -->

```python
tbl.append(df)

# or

tbl.overwrite(df)
```

The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`:

```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
```

You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again:

```python
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)

tbl.append(df)
```

When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:

```
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on this, I also checked the field-ids:

parq 00000-0-27345354-67b8-4861-95ca-c2de9dc8d3fe.parquet --schema

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x11eca2e00>
required group field_id=-1 schema {
  optional binary field_id=1 city (String);
  optional double field_id=2 lat;
  optional double field_id=3 long;
}

pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
```

The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files.

<!-- prettier-ignore-start -->

!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.

<!-- prettier-ignore-end -->

## Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
Expand Down
36 changes: 32 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 62 additions & 20 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
)
from pyiceberg.schema import (
PartnerAccessor,
PreOrderSchemaVisitor,
Expand All @@ -119,8 +123,9 @@
visit,
visit_with_partner,
)
from pyiceberg.table import WriteTask
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1443,18 +1448,15 @@ def parquet_path_to_id_mapping(


def fill_parquet_file_metadata(
df: DataFile,
data_file: DataFile,
parquet_metadata: pq.FileMetaData,
file_size: int,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> None:
"""
Compute and fill the following fields of the DataFile object.

- file_format
- record_count
- file_size_in_bytes
- column_sizes
- value_counts
- null_value_counts
Expand All @@ -1464,11 +1466,8 @@ def fill_parquet_file_metadata(
- split_offsets

Args:
df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
file_size (int): The total compressed file size cannot be retrieved from the metadata and hence has to
be passed here. Depending on the kind of file system and pyarrow library call used, different
ways to obtain this value might be appropriate.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
"""
Expand Down Expand Up @@ -1565,13 +1564,56 @@ def fill_parquet_file_metadata(
del upper_bounds[field_id]
del null_value_counts[field_id]

df.file_format = FileFormat.PARQUET
df.record_count = parquet_metadata.num_rows
df.file_size_in_bytes = file_size
df.column_sizes = column_sizes
df.value_counts = value_counts
df.null_value_counts = null_value_counts
df.nan_value_counts = nan_value_counts
df.lower_bounds = lower_bounds
df.upper_bounds = upper_bounds
df.split_offsets = split_offsets
data_file.record_count = parquet_metadata.num_rows
data_file.column_sizes = column_sizes
data_file.value_counts = value_counts
data_file.null_value_counts = null_value_counts
data_file.nan_value_counts = nan_value_counts
data_file.lower_bounds = lower_bounds
data_file.upper_bounds = upper_bounds
data_file.split_offsets = split_offsets


def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = schema_to_pyarrow(table.schema())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Fokko! I am working with @syun64 to test out the impending write feature. During the test, we realized the field ids are not being set in the written parquet file.
To help illustrate this, I put together A diff against your working branch

The field_ids not written correctly in the parquet (current behavior) looks like:

<pyarrow._parquet.ParquetSchema object at 0x11c40c880>
required group field_id=-1 schema {
  optional binary field_id=-1 id (String);
  optional int32 field_id=-1 date (Date);
}

and the parquet schema after using a different metadata key for field id in the arrow schema to write the parquet file looks like:

<pyarrow._parquet.ParquetSchema object at 0x11c40c880>
required group field_id=-1 schema {
  optional binary field_id=1 id (String);
  optional int32 field_id=2 date (Date);
}

We feel it is a peculiar issue with pyarrow.parquet.ParquetWriter where we need to define the field_ids in the metadata of the pyarrow.schema conforming to a particular format like "PARQUET:field_id" instead of "field_id".
Do you think we should use a different pyarrow schema when we write the pyiceberg file?
prefix with the 'PARQUET:'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jqin61 for testing this as it is paramount that the field-IDs are written properly. I'm able to reproduce this locally:

 <pyarrow._parquet.ParquetSchema object at 0x138782440>
required group field_id=-1 schema {
  optional double field_id=-1 lat;
  optional double field_id=-1 long;
}

After changing this to PARQUET:field_id it is fixed indeed:

parq ~/Desktop/00000-0-f4a20311-0574-4d24-8b8e-2cdf747581af-0.parquet --schema 

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x12087e340>
required group field_id=-1 schema {
  optional double field_id=1 lat;
  optional double field_id=2 long;
}

Thanks for flagging this!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I have

df = pa.Table.from_pylist([{'a':"hello"}, {'a':"world"}])

Should I expect df to have a pa.Schema that can be converted with pyarrow_to_schema ? even the modified one presented in the diff above? I wasn't able to get either branch to work as the schema of df above has no metadata for its fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @robtandy thanks for chiming in here. I think the PyArrow to schema should also include the field-id metadata. When you create a new table, it should re-assign the field-ids if they are missing.


collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
writer.write_table(task.df)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also come from the write if possible so we don't have a S3 request here.

sort_order_id=task.sort_order_id,
# Just copy these from the table for now
spec_id=table.spec().spec_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an unpartitioned write, we need to ensure that this is the unpartitioned spec in the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check if the partition spec is empty:

if len(self.spec().fields) > 0:
    raise ValueError("Currently only unpartitioned tables are supported")

Copy link
Contributor

@rdblue rdblue Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since write_file is a public method, we can't guarantee that the caller did this check. I agree that it is safe when called from append or overwrite, but a caller could use this method directly to create a data file for a partitioned table right?

Wouldn't it be easy to just pass the spec ID and partition tuple (an empty Record) through WriteTask for now? I think it would make sense if a WriteTask were for a single partition.

equality_ids=None,
key_metadata=None,
)

if len(collected_metrics) != 1:
# One file has been written
raise ValueError(f"Expected 1 entry, got: {collected_metrics}")

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=collected_metrics[0],
stats_columns=compute_statistics_plan(table.schema(), table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
)
return iter([data_file])
Loading