Skip to content

Commit

Permalink
fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jul 10, 2024
1 parent 914d6ef commit 71a5fe0
Showing 1 changed file with 9 additions and 20 deletions.
29 changes: 9 additions & 20 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3094,7 +3094,6 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_manifest_counter: itertools.count[int]

def __init__(
self,
Expand All @@ -3117,7 +3116,6 @@ def __init__(
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
self._manifest_counter = itertools.count(0)

def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -3292,7 +3290,7 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)


class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
class DeleteFiles(_SnapshotProducer["DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the predicate.
This will produce a DELETE snapshot:
Expand Down Expand Up @@ -3393,16 +3391,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->

# Rewrite the manifest
if len(existing_entries) > 0:
output_file_location = _new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_counter),
commit_uuid=self.commit_uuid,
)
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self._io.new_output(output_file_location),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for existing_entry in existing_entries:
Expand Down Expand Up @@ -3433,7 +3426,8 @@ def files_affected(self) -> bool:
"""Indicate if any manifest-entries can be dropped."""
return len(self._deleted_entries()) > 0

class FastAppendFiles(_SnapshotProducer):

class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
Expand Down Expand Up @@ -3529,18 +3523,13 @@ def _existing_manifests(self) -> List[ManifestFile]:
if len(found_deleted_data_files) == 0:
existing_files.append(manifest_file)
else:
# We have to rewrite the
output_file_location = _new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_counter),
commit_uuid=self.commit_uuid,
)
# We have to rewrite the manifest file without the deleted data files
if any(entry.data_file not in found_deleted_data_files for entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
output_file=self._io.new_output(output_file_location),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
[
Expand Down Expand Up @@ -4522,14 +4511,14 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
return table_partitions


class _ManifestMergeManager:
class _ManifestMergeManager(Generic[U]):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
_snapshot_producer: _SnapshotProducer
_snapshot_producer: _SnapshotProducer[U]

def __init__(
self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer
self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U]
) -> None:
self._target_size_bytes = target_size_bytes
self._min_count_to_merge = min_count_to_merge
Expand Down

0 comments on commit 71a5fe0

Please sign in to comment.