diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 77bad59835..5346e82c25 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | | `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit | +## Table behavior options + +| Key | Options | Default | Description | +| ------------------------------------ | ------------------- | ------------- | ----------------------------------------------------------- | +| `commit.manifest.target-size-bytes` | Size in bytes | 8388608 (8MB) | Target size when merging manifest files | +| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files | +| `commit.manifest-merge.enabled` | Boolean | False | Controls whether to automatically merge manifests on writes | + + + +!!! note "Fast append" + Unlike Java implementation, PyIceberg default to the [fast append](api.md#write-support) and thus `commit.manifest-merge.enabled` is set to `False` by default. + + + # FileIO Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed. diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index e6a81d2a6a..6148d9a69a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -404,6 +404,48 @@ class ManifestEntry(Record): def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data}) + def _wrap( + self, + new_status: ManifestEntryStatus, + new_snapshot_id: Optional[int], + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + self.status = new_status + self.snapshot_id = new_snapshot_id + self.data_sequence_number = new_data_sequence_number + self.file_sequence_number = new_file_sequence_number + self.data_file = new_file + return self + + def _wrap_append( + self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile + ) -> ManifestEntry: + return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) + + def _wrap_delete( + self, + new_snapshot_id: Optional[int], + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + + def _wrap_existing( + self, + new_snapshot_id: Optional[int], + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + PARTITION_FIELD_SUMMARY_TYPE = StructType( NestedField(509, "contains_null", BooleanType(), required=True), @@ -655,6 +697,7 @@ class ManifestWriter(ABC): _deleted_rows: int _min_data_sequence_number: Optional[int] _partitions: List[Record] + _reused_entry_wrapper: ManifestEntry def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None: self.closed = False @@ -671,6 +714,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._deleted_rows = 0 self._min_data_sequence_number = None self._partitions = [] + self._reused_entry_wrapper = ManifestEntry() def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -776,6 +820,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: self._writer.write_block([self.prepare_entry(entry)]) return self + def add(self, entry: ManifestEntry) -> ManifestWriter: + if entry.data_sequence_number is not None and entry.data_sequence_number >= 0: + self.add_entry( + self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file) + ) + else: + self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) + return self + + def delete(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_delete( + self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + + def existing(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_existing( + entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + class ManifestWriterV1(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9a96cd9245..39bcfc2ef6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,10 +16,13 @@ # under the License. from __future__ import annotations +import concurrent import itertools import uuid import warnings from abc import ABC, abstractmethod +from collections import defaultdict +from concurrent.futures import Future from copy import copy from dataclasses import dataclass from datetime import datetime @@ -69,7 +72,7 @@ inclusive_projection, manifest_evaluator, ) -from pyiceberg.io import FileIO, load_file_io +from pyiceberg.io import FileIO, OutputFile, load_file_io from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, @@ -79,6 +82,7 @@ ManifestEntry, ManifestEntryStatus, ManifestFile, + ManifestWriter, PartitionFieldSummary, write_manifest, write_manifest_list, @@ -145,6 +149,7 @@ StructType, transform_dict_value_to_str, ) +from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis @@ -259,6 +264,15 @@ class TableProperties: FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION = 2 + MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes" + MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB + + MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge" + MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100 + + MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled" + MANIFEST_MERGE_ENABLED_DEFAULT = False + class PropertyUtil: @staticmethod @@ -520,14 +534,22 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) if table_arrow_schema != df.schema: df = df.cast(table_arrow_schema) - with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: + manifest_merge_enabled = PropertyUtil.property_as_bool( + self.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) + append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append + + with append_method() as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io + table_metadata=self._table.metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: - update_snapshot.append_data_file(data_file) + append_files.append_data_file(data_file) def overwrite( self, @@ -3063,14 +3085,15 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) -class _MergingSnapshotProducer(UpdateTableMetadata[U], Generic[U]): +class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): commit_uuid: uuid.UUID + _io: FileIO _operation: Operation _snapshot_id: int _parent_snapshot_id: Optional[int] _added_data_files: List[DataFile] + _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] - _manifest_counter: itertools.count[int] def __init__( self, @@ -3092,13 +3115,13 @@ def __init__( self._added_data_files = [] self._deleted_data_files = set() self.snapshot_properties = snapshot_properties - self._manifest_counter = itertools.count(0) + self._manifest_num_counter = itertools.count(0) - def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]: + def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) return self - def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]: + def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self @@ -3108,23 +3131,22 @@ def _deleted_entries(self) -> List[ManifestEntry]: ... @abstractmethod def _existing_manifests(self) -> List[ManifestFile]: ... + def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + """To perform any post-processing on the manifests before writing them to the new snapshot.""" + return manifests + def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for data_file in self._added_data_files: - writer.add_entry( + writer.add( ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, @@ -3141,17 +3163,11 @@ def _write_delete_manifest() -> List[ManifestFile]: # Check if we need to mark the files as deleted deleted_entries = self._deleted_entries() if len(deleted_entries) > 0: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) - with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for delete_entry in deleted_entries: @@ -3166,7 +3182,7 @@ def _write_delete_manifest() -> List[ManifestFile]: delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return added_manifests.result() + delete_manifests.result() + existing_manifests.result() + return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: ssc = SnapshotSummaryCollector() @@ -3245,8 +3261,36 @@ def _commit(self) -> UpdatesAndRequirements: (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) + @property + def snapshot_id(self) -> int: + return self._snapshot_id + + def spec(self, spec_id: int) -> PartitionSpec: + return self._transaction.table_metadata.specs()[spec_id] + + def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: + return write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=spec, + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) + + def new_manifest_output(self) -> OutputFile: + return self._io.new_output( + _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_num_counter), + commit_uuid=self.commit_uuid, + ) + ) + + def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]: + return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) + -class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): +class DeleteFiles(_SnapshotProducer["DeleteFiles"]): """Will delete manifest entries from the current snapshot based on the predicate. This will produce a DELETE snapshot: @@ -3347,16 +3391,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Rewrite the manifest if len(existing_entries) > 0: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for existing_entry in existing_entries: @@ -3388,7 +3427,7 @@ def files_affected(self) -> bool: return len(self._deleted_entries()) > 0 -class FastAppendFiles(_MergingSnapshotProducer["FastAppendFiles"]): +class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3417,7 +3456,56 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] -class OverwriteFiles(_MergingSnapshotProducer["OverwriteFiles"]): +class MergeAppendFiles(FastAppendFiles): + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ) -> None: + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._target_size_bytes = PropertyUtil.property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._min_count_to_merge = PropertyUtil.property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MIN_MERGE_COUNT, + TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, + ) # type: ignore + self._merge_enabled = PropertyUtil.property_as_bool( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + + def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + """To perform any post-processing on the manifests before writing them to the new snapshot. + + In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge + if automatic merge is enabled. + """ + unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA] + unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=self._min_count_to_merge, + merge_enabled=self._merge_enabled, + snapshot_producer=self, + ) + + return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests + + +class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. Data and delete files were added and removed in a logical overwrite operation. @@ -3435,18 +3523,13 @@ def _existing_manifests(self) -> List[ManifestFile]: if len(found_deleted_data_files) == 0: existing_files.append(manifest_file) else: - # We have to rewrite the - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) + # We have to rewrite the manifest file without the deleted data files if any(entry.data_file not in found_deleted_data_files for entry in entries): with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: [ @@ -3514,6 +3597,11 @@ def fast_append(self) -> FastAppendFiles: operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) + def merge_append(self) -> MergeAppendFiles: + return MergeAppendFiles( + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + ) + def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: return OverwriteFiles( commit_uuid=commit_uuid, @@ -4421,3 +4509,84 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +class _ManifestMergeManager(Generic[U]): + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + _snapshot_producer: _SnapshotProducer[U] + + def __init__( + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U] + ) -> None: + self._target_size_bytes = target_size_bytes + self._min_count_to_merge = min_count_to_merge + self._merge_enabled = merge_enabled + self._snapshot_producer = snapshot_producer + + def _group_by_spec(self, manifests: List[ManifestFile]) -> Dict[int, List[ManifestFile]]: + groups = defaultdict(list) + for manifest in manifests: + groups[manifest.partition_spec_id].append(manifest) + return groups + + def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: + for manifest in manifest_bin: + for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): + if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id: + # only files deleted by this snapshot should be added to the new manifest + writer.delete(entry) + elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: + # added entries from this snapshot are still added, otherwise they should be existing + writer.add(entry) + elif entry.status != ManifestEntryStatus.DELETED: + # add all non-deleted files from the old manifest as existing files + writer.existing(entry) + + return writer.to_manifest_file() + + def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]: + packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) + bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + + def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: + output_manifests = [] + if len(manifest_bin) == 1: + output_manifests.append(manifest_bin[0]) + elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: + # if the bin has the first manifest (the new data files or an appended manifest file) then only + # merge it if the number of manifests is above the minimum count. this is applied only to bins + # with an in-memory manifest so that large manifests don't prevent merging older groups. + output_manifests.extend(manifest_bin) + else: + output_manifests.append(self._create_manifest(spec_id, manifest_bin)) + + return output_manifests + + executor = ExecutorFactory.get_or_create() + futures = [executor.submit(merge_bin, b) for b in bins] + + # for consistent ordering, we need to maintain future order + futures_index = {f: i for i, f in enumerate(futures)} + completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f]) + for future in concurrent.futures.as_completed(futures): + completed_futures.add(future) + + bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()] + + return [manifest for bin_result in bin_results for manifest in bin_result] + + def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + if not self._merge_enabled or len(manifests) == 0: + return manifests + + first_manifest = manifests[0] + groups = self._group_by_spec(manifests) + + merged_manifests = [] + for spec_id in reversed(groups.keys()): + merged_manifests.extend(self._merge_group(first_manifest, spec_id, groups[spec_id])) + + return merged_manifests diff --git a/pyiceberg/utils/bin_packing.py b/pyiceberg/utils/bin_packing.py index ddebde13e2..0291619685 100644 --- a/pyiceberg/utils/bin_packing.py +++ b/pyiceberg/utils/bin_packing.py @@ -104,3 +104,29 @@ def remove_bin(self) -> Bin[T]: return bin_ else: return self.bins.pop(0) + + +class ListPacker(Generic[T]): + _target_weight: int + _lookback: int + _largest_bin_first: bool + + def __init__(self, target_weight: int, lookback: int, largest_bin_first: bool) -> None: + self._target_weight = target_weight + self._lookback = lookback + self._largest_bin_first = largest_bin_first + + def pack(self, items: List[T], weight_func: Callable[[T], int]) -> List[List[T]]: + return list( + PackingIterator( + items=items, + target_weight=self._target_weight, + lookback=self._lookback, + weight_func=weight_func, + largest_bin_first=self._largest_bin_first, + ) + ) + + def pack_end(self, items: List[T], weight_func: Callable[[T], int]) -> List[List[T]]: + packed = self.pack(items=list(reversed(items)), weight_func=weight_func) + return [list(reversed(bin_items)) for bin_items in reversed(packed)] diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 9251d717f8..f887b1ea3b 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1549,3 +1549,35 @@ def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_id # Act and Assert for a non-existing table assert catalog.table_exists(("non", "exist")) is False + + +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + # To catch manifest file name collision bug during merge: + # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918 + catalog.create_namespace_if_not_exists("default") + try: + catalog.drop_table("default.test_merge_manifest") + except NoSuchTableError: + pass + tbl = catalog.create_table( + "default.test_merge_manifest", + arrow_table_with_null.schema, + properties={ + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "2", + "format-version": format_version, + }, + ) + + for _ in range(5): + tbl.append(arrow_table_with_null) + + assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 028aa20dd0..2542fbdb38 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1031,3 +1031,174 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ]) assert written_arrow_table.schema == expected_schema_in_all_us assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + tbl_a = _create_table( + session_catalog, + "default.merge_manifest_a", + {"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + tbl_b = _create_table( + session_catalog, + "default.merge_manifest_b", + { + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "1", + "commit.manifest.target-size-bytes": "1", + "format-version": format_version, + }, + [], + ) + tbl_c = _create_table( + session_catalog, + "default.merge_manifest_c", + {"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + + # tbl_a should merge all manifests into 1 + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + + # tbl_b should not merge any manifests because the target size is too small + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + + # tbl_c should not merge any manifests because merging is disabled + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + + assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore + assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore + assert len(tbl_c.current_snapshot().manifests(tbl_c.io)) == 3 # type: ignore + + # tbl_a and tbl_c should contain the same data + assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + # tbl_b and tbl_c should contain the same data + assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + tbl_a = _create_table( + session_catalog, + "default.merge_manifest_a", + {"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + + # tbl_a should merge all manifests into 1 + tbl_a.append(arrow_table_with_null) + + tbl_a_first_entries = tbl_a.inspect.entries().to_pydict() + first_snapshot_id = tbl_a_first_entries["snapshot_id"][0] + first_data_file_path = tbl_a_first_entries["data_file"][0]["file_path"] + + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + + assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore + + # verify the sequence number of tbl_a's only manifest file + tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore + assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0) + assert tbl_a_manifest.min_sequence_number == (1 if format_version == 2 else 0) + + # verify the manifest entries of tbl_a, in which the manifests are merged + tbl_a_entries = tbl_a.inspect.entries().to_pydict() + assert tbl_a_entries["status"] == [1, 0, 0] + assert tbl_a_entries["sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0] + assert tbl_a_entries["file_sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0] + for i in range(3): + tbl_a_data_file = tbl_a_entries["data_file"][i] + assert tbl_a_data_file["column_sizes"] == [ + (1, 49), + (2, 78), + (3, 128), + (4, 94), + (5, 118), + (6, 94), + (7, 118), + (8, 118), + (9, 118), + (10, 94), + (11, 78), + (12, 109), + ] + assert tbl_a_data_file["content"] == 0 + assert tbl_a_data_file["equality_ids"] is None + assert tbl_a_data_file["file_format"] == "PARQUET" + assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/") + if tbl_a_data_file["file_path"] == first_data_file_path: + # verify that the snapshot id recorded should be the one where the file was added + assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id + assert tbl_a_data_file["key_metadata"] is None + assert tbl_a_data_file["lower_bounds"] == [ + (1, b"\x00"), + (2, b"a"), + (3, b"aaaaaaaaaaaaaaaa"), + (4, b"\x01\x00\x00\x00"), + (5, b"\x01\x00\x00\x00\x00\x00\x00\x00"), + (6, b"\x00\x00\x00\x80"), + (7, b"\x00\x00\x00\x00\x00\x00\x00\x80"), + (8, b"\x00\x9bj\xca8\xf1\x05\x00"), + (9, b"\x00\x9bj\xca8\xf1\x05\x00"), + (10, b"\x9eK\x00\x00"), + (11, b"\x01"), + (12, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x00\x00\x00\x00"), + ] + assert tbl_a_data_file["nan_value_counts"] == [] + assert tbl_a_data_file["null_value_counts"] == [ + (1, 1), + (2, 1), + (3, 1), + (4, 1), + (5, 1), + (6, 1), + (7, 1), + (8, 1), + (9, 1), + (10, 1), + (11, 1), + (12, 1), + ] + assert tbl_a_data_file["partition"] == {} + assert tbl_a_data_file["record_count"] == 3 + assert tbl_a_data_file["sort_order_id"] is None + assert tbl_a_data_file["split_offsets"] == [4] + assert tbl_a_data_file["upper_bounds"] == [ + (1, b"\x01"), + (2, b"z"), + (3, b"zzzzzzzzzzzzzzz{"), + (4, b"\t\x00\x00\x00"), + (5, b"\t\x00\x00\x00\x00\x00\x00\x00"), + (6, b"fff?"), + (7, b"\xcd\xcc\xcc\xcc\xcc\xcc\xec?"), + (8, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"), + (9, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"), + (10, b"\xd9K\x00\x00"), + (11, b"\x12"), + (12, b"\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11" b"\x11\x11\x11\x11"), + ] + assert tbl_a_data_file["value_counts"] == [ + (1, 3), + (2, 3), + (3, 3), + (4, 3), + (5, 3), + (6, 3), + (7, 3), + (8, 3), + (9, 3), + (10, 3), + (11, 3), + (12, 3), + ] diff --git a/tests/utils/test_bin_packing.py b/tests/utils/test_bin_packing.py index 054ea79556..3bfacdf481 100644 --- a/tests/utils/test_bin_packing.py +++ b/tests/utils/test_bin_packing.py @@ -20,7 +20,9 @@ import pytest -from pyiceberg.utils.bin_packing import PackingIterator +from pyiceberg.utils.bin_packing import ListPacker, PackingIterator + +INT_MAX = 2147483647 @pytest.mark.parametrize( @@ -83,4 +85,46 @@ def test_bin_packing_lookback( def weight_func(x: int) -> int: return x + packer: ListPacker[int] = ListPacker(target_weight, lookback, largest_bin_first) + assert list(PackingIterator(splits, target_weight, lookback, weight_func, largest_bin_first)) == expected_lists + assert list(packer.pack(splits, weight_func)) == expected_lists + + +@pytest.mark.parametrize( + "splits, target_weight, lookback, largest_bin_first, expected_lists", + [ + # Single Lookback Tests + ([1, 2, 3, 4, 5], 3, 1, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 4, 1, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 5, 1, False, [[1], [2, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 6, 1, False, [[1, 2, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 7, 1, False, [[1, 2], [3, 4], [5]]), + ([1, 2, 3, 4, 5], 8, 1, False, [[1, 2], [3, 4], [5]]), + ([1, 2, 3, 4, 5], 9, 1, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 11, 1, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 12, 1, False, [[1, 2], [3, 4, 5]]), + ([1, 2, 3, 4, 5], 14, 1, False, [[1], [2, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 15, 1, False, [[1, 2, 3, 4, 5]]), + # Unlimited Lookback Tests + ([1, 2, 3, 4, 5], 3, INT_MAX, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 4, INT_MAX, False, [[2], [1, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 5, INT_MAX, False, [[2, 3], [1, 4], [5]]), + ([1, 2, 3, 4, 5], 6, INT_MAX, False, [[3], [2, 4], [1, 5]]), + ([1, 2, 3, 4, 5], 7, INT_MAX, False, [[1], [3, 4], [2, 5]]), + ([1, 2, 3, 4, 5], 8, INT_MAX, False, [[1, 2, 4], [3, 5]]), + ([1, 2, 3, 4, 5], 9, INT_MAX, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 10, INT_MAX, False, [[2, 3], [1, 4, 5]]), + ([1, 2, 3, 4, 5], 11, INT_MAX, False, [[1, 3], [2, 4, 5]]), + ([1, 2, 3, 4, 5], 12, INT_MAX, False, [[1, 2], [3, 4, 5]]), + ([1, 2, 3, 4, 5], 13, INT_MAX, False, [[2], [1, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 14, INT_MAX, False, [[1], [2, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 15, INT_MAX, False, [[1, 2, 3, 4, 5]]), + ], +) +def test_reverse_bin_packing_lookback( + splits: List[int], target_weight: int, lookback: int, largest_bin_first: bool, expected_lists: List[List[int]] +) -> None: + packer: ListPacker[int] = ListPacker(target_weight, lookback, largest_bin_first) + result = packer.pack_end(splits, lambda x: x) + assert result == expected_lists