From bf63c0391684e6987cbdc8928c9278f6d27c34a0 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 00:26:18 -0700 Subject: [PATCH] fix snapshot inheritance --- pyiceberg/manifest.py | 67 +++++++++++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 40 +++++++++------------- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index defe5958c5..027af2a2aa 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -402,6 +402,46 @@ class ManifestEntry(Record): def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data}) + def _wrap( + self, + new_status: ManifestEntryStatus, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + self.status = new_status + self.snapshot_id = new_snapshot_id + self.data_sequence_number = new_data_sequence_number + self.file_sequence_number = new_file_sequence_number + self.data_file = new_file + return self + + def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry: + return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) + + def _wrap_delete( + self, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + + def _wrap_existing( + self, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + PARTITION_FIELD_SUMMARY_TYPE = StructType( NestedField(509, "contains_null", BooleanType(), required=True), @@ -654,6 +694,7 @@ class ManifestWriter(ABC): _deleted_rows: int _min_data_sequence_number: Optional[int] _partitions: List[Record] + _reused_entry_wrapper: ManifestEntry def __init__( self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT @@ -673,6 +714,7 @@ def __init__( self._deleted_rows = 0 self._min_data_sequence_number = None self._partitions = [] + self._reused_entry_wrapper = ManifestEntry() def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -763,6 +805,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: self._writer.write_block([self.prepare_entry(entry)]) return self + def add(self, entry: ManifestEntry) -> ManifestWriter: + if entry.data_sequence_number is not None and entry.data_sequence_number >= 0: + self.add_entry( + self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file) + ) + else: + self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) + return self + + def delete(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_delete( + self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + + def existing(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_existing( + self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + class ManifestWriterV1(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 851e589a87..3af95d4c78 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -430,12 +430,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to a table transaction. + Shorthand API for appending a PyArrow table to a table transaction. - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ try: import pyarrow as pa except ModuleNotFoundError as e: @@ -461,11 +461,11 @@ def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, - io=self._table.io + table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: update_snapshot.append_data_file(data_file) + def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: @@ -1392,12 +1392,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to the table. + Shorthand API for appending a PyArrow table to the table. - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ with self.transaction() as tx: tx.merge_append(df=df, snapshot_properties=snapshot_properties) @@ -3919,26 +3919,18 @@ def _group_by_spec( def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: for manifest in manifest_bin: - for entry in self._snapshot_producer.fetch_manifest_entry(manifest): + for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): if entry.status == ManifestEntryStatus.DELETED: # suppress deletes from previous snapshots. only files deleted by this snapshot # should be added to the new manifest if entry.snapshot_id == self._snapshot_producer.snapshot_id: - writer.add_entry(entry) + writer.delete(entry) elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: # adds from this snapshot are still adds, otherwise they should be existing - writer.add_entry(entry) + writer.add(entry) else: # add all files from the old manifest as existing files - writer.add_entry( - ManifestEntry( - status=ManifestEntryStatus.EXISTING, - snapshot_id=entry.snapshot_id, - data_sequence_number=entry.data_sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, - ) - ) + writer.existing(entry) return writer.to_manifest_file()