Skip to content

Commit

Permalink
Thanks Kevin!
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 21, 2025
1 parent 1b29d06 commit cafd39d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 20 deletions.
16 changes: 4 additions & 12 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2545,7 +2545,7 @@ class _TablePartition:


def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]:
"""Based on the iceberg table partition spec, slice the arrow table into partitions with their keys.
"""Based on the iceberg table partition spec, filter the arrow table into partitions with their keys.
Example:
Input:
Expand All @@ -2554,17 +2554,9 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}.
The algorithm:
Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')]
and null_placement of "at_end".
This gives the same table as raw input.
Then we sort_indices using reverse order of [('n_legs', 'descending'), ('year', 'descending')]
and null_placement : "at_start".
This gives:
[8, 7, 4, 5, 6, 3, 1, 2, 0]
Based on this we get partition groups of indices:
[{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, {'offset': 0, 'length': 1}]
We then retrieve the partition keys by offsets.
And slice the arrow table by offsets and lengths of each partition.
- We determine the set of unique partition keys
- Then we produce a set of partitions by filtering on each of the combinations
- We combine the chunks to create a copy to avoid GIL congestion on the original table
"""
# Assign unique names to columns where the partition transform has been applied
# to avoid conflicts
Expand Down
13 changes: 5 additions & 8 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema:
the final partition record value.
"""
iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
if not isinstance(value, int):
# When adding files, it can be that we still need to convert from logical types to physical types
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
return transformed_value

Expand All @@ -426,13 +428,8 @@ def _to_partition_representation(type: IcebergType, value: Any) -> Any:

@_to_partition_representation.register(TimestampType)
@_to_partition_representation.register(TimestamptzType)
def _(type: IcebergType, value: Optional[Union[datetime, int]]) -> Optional[int]:
if value is None:
return None
elif isinstance(value, int):
return value
else:
return datetime_to_micros(value)
def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]:
return datetime_to_micros(value) if value is not None else None


@_to_partition_representation.register(DateType)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,7 @@ markers = [
"adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)",
"integration: marks integration tests against Apache Spark",
"gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)",
"benchmark: collection of tests to validate read/write performance before and after a change"
]

# Turns a warning into an error
Expand Down

0 comments on commit cafd39d

Please sign in to comment.