From bb9a4d1cba9c0a300f01dc8cbba5b8265d0493ba Mon Sep 17 00:00:00 2001 From: Maksym Shalenyi Date: Mon, 10 Jun 2024 23:48:11 -0700 Subject: [PATCH] adding add_files_overwrite method use delete instead of overwrite check history too --- mkdocs/docs/api.md | 5 +++ pyiceberg/table/__init__.py | 65 ++++++++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 7386d0297a..5c6fa90c18 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -860,7 +860,12 @@ file_paths = [ tbl.add_files(file_paths=file_paths) +# or if you want to overwrite + +tbl.add_files_overwrite(file_paths=file_paths) + # A new snapshot is committed to the table with manifests pointing to the existing parquet files + ``` diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0cbe4630e4..76afa1b55f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -664,6 +664,27 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = for data_file in data_files: update_snapshot.append_data_file(data_file) + def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand API for adding files as data files and overwriting the table. + + Args: + file_paths: The list of full file paths to be added as data files to the table + snapshot_properties: Custom properties to be added to the snapshot summary + + Raises: + FileNotFoundError: If the file does not exist. + """ + if self._table.name_mapping() is None: + self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self._table.schema().name_mapping.model_dump_json()}) + self.delete(delete_filter=ALWAYS_TRUE, snapshot_properties=snapshot_properties) + with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: + data_files = _parquet_files_to_data_files( + table_metadata=self._table.metadata, file_paths=file_paths, io=self._table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1613,6 +1634,20 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = with self.transaction() as tx: tx.add_files(file_paths=file_paths, snapshot_properties=snapshot_properties) + def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand API for adding files as data files and overwriting the table. + + Args: + file_paths: The list of full file paths to be added as data files to the table + snapshot_properties: Custom properties to be added to the snapshot summary + + Raises: + FileNotFoundError: If the file does not exist. + """ + with self.transaction() as tx: + tx.add_files_overwrite(file_paths=file_paths, snapshot_properties=snapshot_properties) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -3567,9 +3602,9 @@ def merge_append(self) -> MergeAppendFiles: def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: return OverwriteFiles( commit_uuid=commit_uuid, - operation=Operation.OVERWRITE - if self._transaction.table_metadata.current_snapshot() is not None - else Operation.APPEND, + operation=( + Operation.OVERWRITE if self._transaction.table_metadata.current_snapshot() is not None else Operation.APPEND + ), transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, @@ -3959,12 +3994,16 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: "null_value_count": null_value_counts.get(field.field_id), "nan_value_count": nan_value_counts.get(field.field_id), # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, + "lower_bound": ( + from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None + ), + "upper_bound": ( + from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None + ), } for field in self.tbl.metadata.schema().fields } @@ -4199,9 +4238,11 @@ def _partition_summaries_to_rows( "added_delete_files_count": manifest.added_files_count if is_delete_file else 0, "existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0, "deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0, - "partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions) - if manifest.partitions - else [], + "partition_summaries": ( + _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions) + if manifest.partitions + else [] + ), }) return pa.Table.from_pylist(