Skip to content

Commit 8f7927b

Browse files
authored
Write support (#41)
1 parent 8614ba0 commit 8f7927b

File tree

13 files changed

+1035
-136
lines changed

13 files changed

+1035
-136
lines changed

mkdocs/docs/api.md

+98
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(
175175

176176
The static-table is considered read-only.
177177

178+
## Write support
179+
180+
With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:
181+
182+
```python
183+
import pyarrow as pa
184+
185+
df = pa.Table.from_pylist(
186+
[
187+
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
188+
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
189+
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
190+
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
191+
],
192+
)
193+
```
194+
195+
Next, create a table based on the schema:
196+
197+
```python
198+
from pyiceberg.catalog import load_catalog
199+
200+
catalog = load_catalog("default")
201+
202+
from pyiceberg.schema import Schema
203+
from pyiceberg.types import NestedField, StringType, DoubleType
204+
205+
schema = Schema(
206+
NestedField(1, "city", StringType(), required=False),
207+
NestedField(2, "lat", DoubleType(), required=False),
208+
NestedField(3, "long", DoubleType(), required=False),
209+
)
210+
211+
tbl = catalog.create_table("default.cities", schema=schema)
212+
```
213+
214+
Now write the data to the table:
215+
216+
<!-- prettier-ignore-start -->
217+
218+
!!! note inline end "Fast append"
219+
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads.
220+
221+
<!-- prettier-ignore-end -->
222+
223+
```python
224+
tbl.append(df)
225+
226+
# or
227+
228+
tbl.overwrite(df)
229+
```
230+
231+
The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`:
232+
233+
```
234+
pyarrow.Table
235+
city: string
236+
lat: double
237+
long: double
238+
----
239+
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
240+
lat: [[52.371807,37.773972,53.11254,48.864716]]
241+
long: [[4.896029,-122.431297,6.0989,2.349014]]
242+
```
243+
244+
You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again:
245+
246+
```python
247+
df = pa.Table.from_pylist(
248+
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
249+
)
250+
251+
tbl.append(df)
252+
```
253+
254+
When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:
255+
256+
```
257+
pyarrow.Table
258+
city: string
259+
lat: double
260+
long: double
261+
----
262+
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
263+
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
264+
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
265+
```
266+
267+
The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files.
268+
269+
<!-- prettier-ignore-start -->
270+
271+
!!! example "Under development"
272+
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.
273+
274+
<!-- prettier-ignore-end -->
275+
178276
## Schema evolution
179277

180278
PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).

poetry.lock

+32-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/io/pyarrow.py

+62-20
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@
105105
OutputFile,
106106
OutputStream,
107107
)
108-
from pyiceberg.manifest import DataFile, FileFormat
108+
from pyiceberg.manifest import (
109+
DataFile,
110+
DataFileContent,
111+
FileFormat,
112+
)
109113
from pyiceberg.schema import (
110114
PartnerAccessor,
111115
PreOrderSchemaVisitor,
@@ -119,8 +123,9 @@
119123
visit,
120124
visit_with_partner,
121125
)
126+
from pyiceberg.table import WriteTask
122127
from pyiceberg.transforms import TruncateTransform
123-
from pyiceberg.typedef import EMPTY_DICT, Properties
128+
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
124129
from pyiceberg.types import (
125130
BinaryType,
126131
BooleanType,
@@ -1443,18 +1448,15 @@ def parquet_path_to_id_mapping(
14431448

14441449

14451450
def fill_parquet_file_metadata(
1446-
df: DataFile,
1451+
data_file: DataFile,
14471452
parquet_metadata: pq.FileMetaData,
1448-
file_size: int,
14491453
stats_columns: Dict[int, StatisticsCollector],
14501454
parquet_column_mapping: Dict[str, int],
14511455
) -> None:
14521456
"""
14531457
Compute and fill the following fields of the DataFile object.
14541458
14551459
- file_format
1456-
- record_count
1457-
- file_size_in_bytes
14581460
- column_sizes
14591461
- value_counts
14601462
- null_value_counts
@@ -1464,11 +1466,8 @@ def fill_parquet_file_metadata(
14641466
- split_offsets
14651467
14661468
Args:
1467-
df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
1469+
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
14681470
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
1469-
file_size (int): The total compressed file size cannot be retrieved from the metadata and hence has to
1470-
be passed here. Depending on the kind of file system and pyarrow library call used, different
1471-
ways to obtain this value might be appropriate.
14721471
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
14731472
set the mode for column metrics collection
14741473
"""
@@ -1565,13 +1564,56 @@ def fill_parquet_file_metadata(
15651564
del upper_bounds[field_id]
15661565
del null_value_counts[field_id]
15671566

1568-
df.file_format = FileFormat.PARQUET
1569-
df.record_count = parquet_metadata.num_rows
1570-
df.file_size_in_bytes = file_size
1571-
df.column_sizes = column_sizes
1572-
df.value_counts = value_counts
1573-
df.null_value_counts = null_value_counts
1574-
df.nan_value_counts = nan_value_counts
1575-
df.lower_bounds = lower_bounds
1576-
df.upper_bounds = upper_bounds
1577-
df.split_offsets = split_offsets
1567+
data_file.record_count = parquet_metadata.num_rows
1568+
data_file.column_sizes = column_sizes
1569+
data_file.value_counts = value_counts
1570+
data_file.null_value_counts = null_value_counts
1571+
data_file.nan_value_counts = nan_value_counts
1572+
data_file.lower_bounds = lower_bounds
1573+
data_file.upper_bounds = upper_bounds
1574+
data_file.split_offsets = split_offsets
1575+
1576+
1577+
def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
1578+
task = next(tasks)
1579+
1580+
try:
1581+
_ = next(tasks)
1582+
# If there are more tasks, raise an exception
1583+
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
1584+
except StopIteration:
1585+
pass
1586+
1587+
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
1588+
file_schema = schema_to_pyarrow(table.schema())
1589+
1590+
collected_metrics: List[pq.FileMetaData] = []
1591+
fo = table.io.new_output(file_path)
1592+
with fo.create(overwrite=True) as fos:
1593+
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
1594+
writer.write_table(task.df)
1595+
1596+
data_file = DataFile(
1597+
content=DataFileContent.DATA,
1598+
file_path=file_path,
1599+
file_format=FileFormat.PARQUET,
1600+
partition=Record(),
1601+
file_size_in_bytes=len(fo),
1602+
sort_order_id=task.sort_order_id,
1603+
# Just copy these from the table for now
1604+
spec_id=table.spec().spec_id,
1605+
equality_ids=None,
1606+
key_metadata=None,
1607+
)
1608+
1609+
if len(collected_metrics) != 1:
1610+
# One file has been written
1611+
raise ValueError(f"Expected 1 entry, got: {collected_metrics}")
1612+
1613+
fill_parquet_file_metadata(
1614+
data_file=data_file,
1615+
parquet_metadata=collected_metrics[0],
1616+
stats_columns=compute_statistics_plan(table.schema(), table.properties),
1617+
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
1618+
)
1619+
return iter([data_file])

0 commit comments

Comments
 (0)