Skip to content

Commit

Permalink
Support MergeAppend operations (#363)
Browse files Browse the repository at this point in the history
* add ListPacker + tests

* add merge append

* add merge_append

* fix snapshot inheritance

* test manifest file and entries

* add doc

* fix lint

* change test name

* address review comments

* rename _MergingSnapshotProducer to _SnapshotProducer

* fix a serious bug

* update the doc

* remove merge_append as public API

* make default to false

* add test description

* fix merge conflict

* fix snapshot_id issue
  • Loading branch information
HonahX authored Jul 10, 2024
1 parent 66b92ff commit 77a07c9
Show file tree
Hide file tree
Showing 7 changed files with 567 additions and 41 deletions.
15 changes: 15 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |

## Table behavior options

| Key | Options | Default | Description |
| ------------------------------------ | ------------------- | ------------- | ----------------------------------------------------------- |
| `commit.manifest.target-size-bytes` | Size in bytes | 8388608 (8MB) | Target size when merging manifest files |
| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files |
| `commit.manifest-merge.enabled` | Boolean | False | Controls whether to automatically merge manifests on writes |

<!-- prettier-ignore-start -->

!!! note "Fast append"
Unlike Java implementation, PyIceberg default to the [fast append](api.md#write-support) and thus `commit.manifest-merge.enabled` is set to `False` by default.

<!-- prettier-ignore-end -->

# FileIO

Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.
Expand Down
69 changes: 69 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,48 @@ class ManifestEntry(Record):
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})

def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)

def _wrap_existing(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
Expand Down Expand Up @@ -655,6 +697,7 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
self.closed = False
Expand All @@ -671,6 +714,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._deleted_rows = 0
self._min_data_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -776,6 +820,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
Expand Down
Loading

0 comments on commit 77a07c9

Please sign in to comment.