Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial deletes #569

Merged
merged 49 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8d45920
Add option to delete datafiles
Fokko Apr 2, 2024
f6084a6
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 2, 2024
87cc065
Pull in main
Fokko Apr 2, 2024
bc9c83e
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 3, 2024
284d05a
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 5, 2024
234d55b
WIP
Fokko Apr 5, 2024
aadc89c
Change DataScan to accept Metadata and io
Fokko Apr 5, 2024
7e59342
fix name-mapping issue
HonahX Apr 7, 2024
fbf6492
Merge pull request #1 from HonahX/honahx-update-datascan
Fokko Apr 8, 2024
c3fa7e7
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 8, 2024
a925d69
Merge branch 'fd-update-datascan' of github.com:Fokko/iceberg-python …
Fokko Apr 8, 2024
5cec00a
WIP
Fokko Apr 8, 2024
a5e988a
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 8, 2024
1723819
WIP
Fokko Apr 9, 2024
5025b4a
Moar tests
Fokko Apr 10, 2024
e474fda
Oops
Fokko Apr 11, 2024
172f9c0
Cleanup
Fokko Apr 16, 2024
a97c45a
WIP
Fokko Apr 16, 2024
74497fb
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 17, 2024
47c9de1
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 17, 2024
9c6724e
WIP
Fokko Apr 17, 2024
edff166
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
c443af2
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
7dae071
Fix summary generation
Fokko Apr 18, 2024
5e871fb
Last few bits
Fokko Apr 18, 2024
9910d29
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
cd19f80
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 25, 2024
edfadd9
Fix the requirement
Fokko Apr 25, 2024
d65a8a4
Make ruff happy
Fokko Apr 25, 2024
8849d97
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 2, 2024
3c98eef
Comments, thanks Kevin!
Fokko May 2, 2024
179fa27
Comments
Fokko May 9, 2024
2ea157e
Append rather than truncate
Fokko May 10, 2024
18392d1
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 10, 2024
7d036b1
Fix merge conflicts
Fokko May 10, 2024
5adf3f0
Make the tests pass
Fokko May 13, 2024
b3fcdcf
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 27, 2024
4ceacb8
Add another test
Fokko May 30, 2024
ddf6119
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 30, 2024
5b10f25
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 30, 2024
4cd67ac
Conflicts
Fokko May 30, 2024
5cdb363
Add docs (#33)
sungwy Jun 15, 2024
2252e71
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jun 21, 2024
05fcf2d
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 5, 2024
1ccb31d
Add a partitioned overwrite test
Fokko Jul 5, 2024
86432fe
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 8, 2024
259f8c5
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 9, 2024
96d6392
Fix comment
Fokko Jul 9, 2024
301bc82
Skip empty manifests
Fokko Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,25 @@ df = pa.Table.from_pylist(
table.append(df)
```

<!-- prettier-ignore-start -->
You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`.

```python
tbl.delete(delete_filter="city == 'Paris'")
```

!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.
In the above example, any records where the city field value equals to `Paris` will be deleted.
Running `tbl.scan().to_arrow()` will now yield:

<!-- prettier-ignore-end -->
```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
```

## Inspecting tables

Expand Down
64 changes: 62 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import logging
import os
import re
import uuid
from abc import ABC, abstractmethod
from concurrent.futures import Future
from copy import copy
Expand Down Expand Up @@ -126,7 +127,6 @@
visit,
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -159,7 +159,7 @@
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

if TYPE_CHECKING:
from pyiceberg.table import FileScanTask
from pyiceberg.table import FileScanTask, WriteTask

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1563,6 +1563,8 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_default_mode: str

def __init__(self, schema: Schema, properties: Dict[str, str]):
from pyiceberg.table import TableProperties

self._schema = schema
self._properties = properties
self._default_mode = self._properties.get(
Expand Down Expand Up @@ -1598,6 +1600,8 @@ def map(
return k + v

def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
from pyiceberg.table import TableProperties

column_name = self._schema.find_column_name(self._field_id)
if column_name is None:
return []
Expand Down Expand Up @@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import PropertyUtil, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
Expand Down Expand Up @@ -2005,6 +2011,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
from pyiceberg.table import PropertyUtil, TableProperties

for key_pattern in [
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_PAGE_ROW_LIMIT,
Expand Down Expand Up @@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
),
}


def _dataframe_to_data_files(
Fokko marked this conversation as resolved.
Show resolved Hide resolved
table_metadata: TableMetadata,
df: pa.Table,
io: FileIO,
write_uuid: Optional[uuid.UUID] = None,
counter: Optional[itertools.count[int]] = None,
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.

Returns:
An iterable that supplies datafiles that represent the table.
"""
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask

counter = counter or itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value.
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)

if table_metadata.spec().is_unpartitioned():
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema())
for batches in bin_pack_arrow_table(df, target_file_size)
]),
)
else:
from pyiceberg.table import _determine_partitions

partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=table_metadata.schema(),
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]),
)
2 changes: 1 addition & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class DataFile(Record):
split_offsets: Optional[List[int]]
equality_ids: Optional[List[int]]
sort_order_id: Optional[int]
spec_id: Optional[int]
spec_id: int

def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a DataFile."""
Expand Down
Loading