diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e4d9896096..172d21c375 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2545,7 +2545,7 @@ class _TablePartition: def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]: - """Based on the iceberg table partition spec, slice the arrow table into partitions with their keys. + """Based on the iceberg table partition spec, filter the arrow table into partitions with their keys. Example: Input: @@ -2554,17 +2554,9 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. The algorithm: - Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')] - and null_placement of "at_end". - This gives the same table as raw input. - Then we sort_indices using reverse order of [('n_legs', 'descending'), ('year', 'descending')] - and null_placement : "at_start". - This gives: - [8, 7, 4, 5, 6, 3, 1, 2, 0] - Based on this we get partition groups of indices: - [{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, {'offset': 0, 'length': 1}] - We then retrieve the partition keys by offsets. - And slice the arrow table by offsets and lengths of each partition. + - We determine the set of unique partition keys + - Then we produce a set of partitions by filtering on each of the combinations + - We combine the chunks to create a copy to avoid GIL congestion on the original table """ # Assign unique names to columns where the partition transform has been applied # to avoid conflicts diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index b3ab763bbd..6ca725b12c 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -414,7 +414,9 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: the final partition record value. """ iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type - iceberg_typed_value = _to_partition_representation(iceberg_type, value) + if not isinstance(value, int): + # When adding files, it can be that we still need to convert from logical types to physical types + iceberg_typed_value = _to_partition_representation(iceberg_type, value) transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) return transformed_value @@ -426,13 +428,8 @@ def _to_partition_representation(type: IcebergType, value: Any) -> Any: @_to_partition_representation.register(TimestampType) @_to_partition_representation.register(TimestamptzType) -def _(type: IcebergType, value: Optional[Union[datetime, int]]) -> Optional[int]: - if value is None: - return None - elif isinstance(value, int): - return value - else: - return datetime_to_micros(value) +def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: + return datetime_to_micros(value) if value is not None else None @_to_partition_representation.register(DateType) diff --git a/pyproject.toml b/pyproject.toml index dcdb5e7156..c71818e7ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1220,6 +1220,7 @@ markers = [ "adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)", "integration: marks integration tests against Apache Spark", "gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)", + "benchmark: collection of tests to validate read/write performance before and after a change" ] # Turns a warning into an error