Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support merge manifests on writes (MergeAppend) #363

Merged
merged 22 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ tbl.append(df)

# or

tbl.merge_append(df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant to expose this to the public API for a couple of reasons:

  • Unsure if folks know what the impact is between choosing fast- or merge appends.
  • It might also be that we do appends as part of the operation (upserts as an obvious one).
  • Another method to the public API :)

How about having something similar as in Java, to control this using a table property: https://iceberg.apache.org/docs/1.5.2/configuration/#table-behavior-properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! I am also +1 on let it controlled by the config. I made merge_append a separate API to mirror the Java side implementation, which has newAppend and newFastAppend APIs. But it seems better to just make the commit.manifest-merge.enabled default to False on python side.

I will still keep FastAppend and MergeAppend as separate class, and keep merge_append in UpdateSnapshot class to ensure clarity, although the current MergeAppend is purely FastAppend + manifest merge.

Just curious, why not Java side newAppend return an FastAppend impl when commit.manifest-merge.enabled is False. Is it due to some backward compatibiilty issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think the use-case of the Java library is slightly different, since that's mostly used in query engines.

Is it due to some backward compatibiilty issue?

I think it is for historical reasons, since the fast-append was added later on :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I like how you split it out in classes, it is much cleaner now 👍


# or

tbl.overwrite(df)
```

Expand Down
15 changes: 15 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |

## Table behavior options

| Key | Options | Default | Description |
| ------------------------------------ | ------------------- | ------- | ----------------------------------------------------------- |
| `commit.manifest.target-size-bytes` | Size in bytes | 8MB | Target size when merging manifest files |
HonahX marked this conversation as resolved.
Show resolved Hide resolved
| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files |
| `commit.manifest-merge.enabled` | Boolean | True | Controls whether to automatically merge manifests on writes |
HonahX marked this conversation as resolved.
Show resolved Hide resolved

<!-- prettier-ignore-start -->

!!! note "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table merge manifests on writes and respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead.

<!-- prettier-ignore-end -->

# FileIO

Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.
Expand Down
67 changes: 67 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ class ManifestEntry(Record):
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})

def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)

def _wrap_existing(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
Expand Down Expand Up @@ -654,6 +694,7 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

def __init__(
self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT
Expand All @@ -673,6 +714,7 @@ def __init__(
self._deleted_rows = 0
self._min_data_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -763,6 +805,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
Expand Down
Loading