diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 74692f85b8..61299bca7f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -31,6 +31,7 @@ import logging import os import re +import uuid from abc import ABC, abstractmethod from concurrent.futures import Future from copy import copy @@ -126,7 +127,6 @@ visit, visit_with_partner, ) -from pyiceberg.table import PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -159,7 +159,7 @@ from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string if TYPE_CHECKING: - from pyiceberg.table import FileScanTask + from pyiceberg.table import FileScanTask, WriteTask logger = logging.getLogger(__name__) @@ -1443,6 +1443,8 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector] _default_mode: str def __init__(self, schema: Schema, properties: Dict[str, str]): + from pyiceberg.table import TableProperties + self._schema = schema self._properties = properties self._default_mode = self._properties.get( @@ -1478,6 +1480,8 @@ def map( return k + v def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: + from pyiceberg.table import TableProperties + column_name = self._schema.find_column_name(self._field_id) if column_name is None: return [] @@ -1774,7 +1778,9 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterable["WriteTask"]) -> Iterator[DataFile]: + from pyiceberg.table import PropertyUtil, TableProperties + schema = table_metadata.schema() arrow_file_schema = schema.as_arrow() parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -1875,6 +1881,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: + from pyiceberg.table import PropertyUtil, TableProperties + for key_pattern in [ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, TableProperties.PARQUET_PAGE_ROW_LIMIT, @@ -1912,3 +1920,55 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT, ), } + + +def _dataframe_to_data_files( + table_metadata: TableMetadata, + df: pa.Table, + io: FileIO, + write_uuid: Optional[uuid.UUID] = None, + counter: Optional[itertools.count[int]] = None, +) -> Iterable[DataFile]: + """Convert a PyArrow table into a DataFile. + + Returns: + An iterable that supplies datafiles that represent the table. + """ + from pyiceberg.table import PropertyUtil, TableProperties, WriteTask + + counter = counter or itertools.count(0) + write_uuid = write_uuid or uuid.uuid4() + target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value. + properties=table_metadata.properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + + if table_metadata.spec().is_unpartitioned(): + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([ + WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) + for batches in bin_pack_arrow_table(df, target_file_size) + ]), + ) + else: + from pyiceberg.table import determine_partitions + + partitions = determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([ + WriteTask( + write_uuid=write_uuid, + task_id=next(counter), + record_batches=batches, + partition_key=partition.partition_key, + schema=table_metadata.schema(), + ) + for partition in partitions + for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) + ]), + ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d292e5242b..3f50736f63 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -68,6 +68,7 @@ manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io +from pyiceberg.io.pyarrow import _dataframe_to_data_files, project_table from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -307,19 +308,14 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ return self - - def _scan( - self, - row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE - ) -> DataScan: - """Minimal data scan the table with the current state of the transaction""" + def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) -> DataScan: + """Minimal data scan the table with the current state of the transaction.""" return DataScan( table_metadata=self.table_metadata, io=self._table.io, row_filter=row_filter, ) - def upgrade_table_version(self, format_version: TableVersion) -> Transaction: """Set the table to a certain version. @@ -461,17 +457,42 @@ def overwrite( update_snapshot.append_data_file(data_file) def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: - with self.update_snapshot(snapshot_properties=snapshot_properties) as snapshot: - with snapshot.delete() as delete: - delete.delete(delete_filter) - - # Check if there are any files that require an actual rewrite of a data file - if delete.rewrites_needed: - files = self._scan().plan_files() - - for file in files: - + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_snapshot.delete_by_predicate(delete_filter) # type: ignore + + # Check if there are any files that require an actual rewrite of a data file + if delete_snapshot.rewrites_needed: # type: ignore + with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot: + # Potential optimization is where we check if the files actually contain relevant data. + files = self._scan(row_filter=delete_filter).plan_files() + + counter = itertools.count(0) + + # This will load the Parquet file into memory, including: + # - Filter out the rows based on the delete filter + # - Projecting it to the current schema + # - Applying the positional deletes if they are there + # When writing + # - Apply the latest partition-spec + # - And sort order when added + for original_file in files: + df = project_table( + tasks=[original_file], + table_metadata=self._table.metadata, + io=self._table.io, + row_filter=delete_filter, + projected_schema=self.table_metadata.schema(), + ) + for data_file in _dataframe_to_data_files( + io=self._table.io, + df=df, + table_metadata=self._table.metadata, + write_uuid=overwrite_snapshot.commit_uuid, + counter=counter, + ): + overwrite_snapshot.append_data_file(data_file) + overwrite_snapshot.delete_data_file(original_file.file) def add_files(self, file_paths: List[str]) -> None: """ @@ -1310,6 +1331,9 @@ def current_snapshot(self) -> Optional[Snapshot]: return self.snapshot_by_id(self.metadata.current_snapshot_id) return None + def snapshots(self) -> List[Snapshot]: + return self.metadata.snapshots + def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: """Get the snapshot of this table with the given id, or None if there is no matching snapshot.""" return self.metadata.snapshot_by_id(snapshot_id) @@ -1373,7 +1397,7 @@ def overwrite( def delete(self, delete_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand for deleting rows from the table + Shorthand for deleting rows from the table. Args: delete_filter: The predicate that used to remove rows @@ -1382,8 +1406,6 @@ def delete(self, delete_filter: BooleanExpression = ALWAYS_TRUE, snapshot_proper with self.transaction() as tx: tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) - - def add_files(self, file_paths: List[str]) -> None: """ Shorthand API for adding files as data files to the table. @@ -1671,7 +1693,6 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id]) return project(self.row_filter) @@ -1925,7 +1946,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: visit_with_partner( Catalog._convert_schema_if_needed(new_schema), -1, - UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore + UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), + # type: ignore PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive), ) return self @@ -2724,52 +2746,6 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro' -def _dataframe_to_data_files( - table_metadata: TableMetadata, df: pa.Table, io: FileIO, write_uuid: Optional[uuid.UUID] = None -) -> Iterable[DataFile]: - """Convert a PyArrow table into a DataFile. - - Returns: - An iterable that supplies datafiles that represent the table. - """ - from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file - - counter = itertools.count(0) - write_uuid = write_uuid or uuid.uuid4() - target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value. - properties=table_metadata.properties, - property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, - default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - ) - - if len(table_metadata.spec().fields) > 0: - partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) - yield from write_file( - io=io, - table_metadata=table_metadata, - tasks=iter([ - WriteTask( - write_uuid=write_uuid, - task_id=next(counter), - record_batches=batches, - partition_key=partition.partition_key, - schema=table_metadata.schema(), - ) - for partition in partitions - for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) - ]), - ) - else: - yield from write_file( - io=io, - table_metadata=table_metadata, - tasks=iter([ - WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) - for batches in bin_pack_arrow_table(df, target_file_size) - ]), - ) - - def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. @@ -2787,6 +2763,8 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): _snapshot_id: int _parent_snapshot_id: Optional[int] _added_data_files: List[DataFile] + _deleted_data_files: Set[DataFile] + _manifest_counter: itertools.count[int] def __init__( self, @@ -2806,12 +2784,18 @@ def __init__( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) self._added_data_files = [] + self._deleted_data_files = set() self.snapshot_properties = snapshot_properties + self._manifest_counter = itertools.count(0) def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: self._added_data_files.append(data_file) return self + def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: + self._deleted_data_files.add(data_file) + return self + @abstractmethod def _deleted_entries(self) -> List[ManifestEntry]: ... @@ -2822,7 +2806,9 @@ def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=0, commit_uuid=self.commit_uuid + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, ) with write_manifest( format_version=self._transaction.table_metadata.format_version, @@ -2850,7 +2836,9 @@ def _write_delete_manifest() -> List[ManifestFile]: deleted_entries = self._deleted_entries() if len(deleted_entries) > 0: output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=1, commit_uuid=self.commit_uuid + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, ) with write_manifest( @@ -2941,13 +2929,20 @@ def _commit(self) -> UpdatesAndRequirements: ), ( AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid), - AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), + # AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), ), ) -class MetadataDeleteFiles(_MergingSnapshotProducer): - """Will delete manifest entries from the current snapshot based on the predicate""" +class DeleteFiles(_MergingSnapshotProducer): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ _predicate: BooleanExpression @@ -2986,7 +2981,7 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool] return lambda data_file: expression_evaluator(partition_schema, partition_expr, case_sensitive=True)(data_file.partition) - def delete(self, predicate: BooleanExpression) -> None: + def delete_by_predicate(self, predicate: BooleanExpression) -> None: self._predicate = Or(self._predicate, predicate) @cached_property @@ -3010,7 +3005,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> total_deleted_entries = [] partial_rewrites_needed = False if snapshot := self._transaction.table_metadata.current_snapshot(): - for num, manifest_file in enumerate(snapshot.manifests(io=self._io)): + for manifest_file in snapshot.manifests(io=self._io): if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): # If the manifest isn't relevant, we can just keep it in the manifest-list existing_manifests.append(manifest_file) @@ -3018,7 +3013,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # It is relevant, let's check out the content deleted_entries = [] existing_entries = [] - for entry in manifest_file.fetch_manifest_entry(io=self._io): + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH: @@ -3033,7 +3028,9 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Rewrite the manifest if len(existing_entries) > 0: output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=num, commit_uuid=self.commit_uuid + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, ) with write_manifest( format_version=self._transaction.table_metadata.format_version, @@ -3057,7 +3054,7 @@ def _deleted_entries(self) -> List[ManifestEntry]: return self._compute_deletes[1] def rewrites_needed(self) -> bool: - """Indicates if data files need to be rewritten""" + """Indicate if data files need to be rewritten.""" return self._compute_deletes[2] @@ -3091,13 +3088,50 @@ def _deleted_entries(self) -> List[ManifestEntry]: class OverwriteFiles(_MergingSnapshotProducer): + """Overwrites data from the table. This will produce an OVERWRITE snapshot. + + Data and delete files were added and removed in a logical overwrite operation. + """ + def _existing_manifests(self) -> List[ManifestFile]: - """To determine if there are any existing manifest files. + """Determine if there are any existing manifest files.""" + existing_files = [] - In the of a full overwrite, all the previous manifests are - considered deleted. - """ - return [] + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) + found_deletes = [_ for entry in entries if entry in self._deleted_data_files] + + if len(found_deletes) == 0: + existing_files.append(manifest_file) + else: + output_file_location = _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, + ) + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + output_file=self._io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + [ + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + for entry in entries + if entry not in found_deletes + ] + existing_files.append(writer.to_manifest_file()) + return existing_files def _deleted_entries(self) -> List[ManifestEntry]: """To determine if we need to record any deleted entries. @@ -3124,7 +3158,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: data_file=entry.data_file, ) for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) - if entry.data_file.content == DataFileContent.DATA + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files ] list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) @@ -3158,8 +3192,8 @@ def overwrite(self) -> OverwriteFiles: snapshot_properties=self._snapshot_properties, ) - def delete(self) -> MetadataDeleteFiles: - return MetadataDeleteFiles( + def delete(self) -> DeleteFiles: + return DeleteFiles( operation=Operation.DELETE, transaction=self._transaction, io=self._io, @@ -3642,7 +3676,7 @@ def _get_table_partitions( return table_partitions -def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]: +def determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]: """Based on the iceberg table partition spec, slice the arrow table into partitions with their keys. Example: @@ -3651,7 +3685,7 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021], 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. - The algrithm: + The algorithm: Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')] and null_placement of "at_end". This gives the same table as raw input. diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 5cd60225c4..46884b6769 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -33,9 +33,8 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.schema import Schema -from pyiceberg.table import _dataframe_to_data_files from pyiceberg.types import IntegerType from tests.conftest import clean_up, get_bucket_name, get_s3_path diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 99b8550602..1cea5a1e12 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -36,10 +36,9 @@ TableAlreadyExistsError, ) from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL -from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema -from pyiceberg.table import _dataframe_to_data_files from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( NullOrder, diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index b615065a19..3face11215 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -56,10 +56,7 @@ def test_partition_deletes(test_deletes_table: DataFrame, session_catalog: RestC identifier = 'default.table_partitioned_delete' tbl = session_catalog.load_table(identifier) - - with tbl.transaction() as txn: - with txn.update_snapshot().delete() as delete: - delete.delete(EqualTo("number_partitioned", 10)) + tbl.delete(EqualTo("number_partitioned", 10)) assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]} @@ -68,9 +65,6 @@ def test_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> identifier = 'default.table_partitioned_delete' tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number", 30)) - with tbl.transaction() as txn: - with txn.update_snapshot().delete() as delete: - delete.delete(EqualTo("number", 30)) - - assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]} + assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 20]} diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e1526d2a5e..65e92521b7 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -36,7 +36,8 @@ from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.table import TableProperties, _dataframe_to_data_files +from pyiceberg.io.pyarrow import _dataframe_to_data_files +from pyiceberg.table import TableProperties from tests.conftest import TEST_DATA_WITH_NULL from utils import _create_table diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 2bc78f3197..feccb99d5c 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -64,9 +64,9 @@ UpdateSchema, _apply_table_update, _check_schema_compatible, - _determine_partitions, _match_deletes_to_data_file, _TableMetadataUpdateContext, + determine_partitions, update_table_metadata, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id @@ -1166,7 +1166,7 @@ def test_partition_for_demo() -> None: PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="n_legs_identity"), PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="year_identity"), ) - result = _determine_partitions(partition_spec, test_schema, arrow_table) + result = determine_partitions(partition_spec, test_schema, arrow_table) assert {table_partition.partition_key.partition for table_partition in result} == { Record(n_legs_identity=2, year_identity=2020), Record(n_legs_identity=100, year_identity=2021), @@ -1216,7 +1216,7 @@ def test_identity_partition_on_multi_columns() -> None: } arrow_table = pa.Table.from_pydict(test_data, schema=test_pa_schema) - result = _determine_partitions(partition_spec, test_schema, arrow_table) + result = determine_partitions(partition_spec, test_schema, arrow_table) assert {table_partition.partition_key.partition for table_partition in result} == expected concatenated_arrow_table = pa.concat_tables([table_partition.arrow_table_partition for table_partition in result]) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index e85ecce506..827de7ccc3 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -313,10 +313,6 @@ def test_invalid_operation() -> None: update_snapshot_summaries(summary=Summary(Operation.REPLACE)) assert "Operation not implemented: Operation.REPLACE" in str(e.value) - with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation.DELETE)) - assert "Operation not implemented: Operation.DELETE" in str(e.value) - def test_invalid_type() -> None: with pytest.raises(ValueError) as e: