Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/iceberg-python into fd-fix-o…
Browse files Browse the repository at this point in the history
…verflowing-buffer
  • Loading branch information
Fokko committed Jan 22, 2025
2 parents 993c382 + 666a926 commit 4cc2c5b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
27 changes: 26 additions & 1 deletion pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import base64
import datetime as py_datetime
import struct
from abc import ABC, abstractmethod
from enum import IntEnum
Expand Down Expand Up @@ -298,7 +299,31 @@ def can_transform(self, source: IcebergType) -> bool:
)

def transform(self, source: IcebergType, bucket: bool = True) -> Callable[[Optional[Any]], Optional[int]]:
if isinstance(source, (IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType)):
if isinstance(source, TimeType):

def hash_func(v: Any) -> int:
if isinstance(v, py_datetime.time):
v = datetime.time_to_micros(v)

return mmh3.hash(struct.pack("<q", v))

elif isinstance(source, DateType):

def hash_func(v: Any) -> int:
if isinstance(v, py_datetime.date):
v = datetime.date_to_days(v)

return mmh3.hash(struct.pack("<q", v))

elif isinstance(source, (TimestampType, TimestamptzType)):

def hash_func(v: Any) -> int:
if isinstance(v, py_datetime.datetime):
v = datetime.datetime_to_micros(v)

return mmh3.hash(struct.pack("<q", v))

elif isinstance(source, (IntegerType, LongType)):

def hash_func(v: Any) -> int:
return mmh3.hash(struct.pack("<q", v))
Expand Down
40 changes: 40 additions & 0 deletions tests/table/test_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,32 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
from decimal import Decimal
from typing import Any
from uuid import UUID

import pytest

from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.transforms import BucketTransform, IdentityTransform, TruncateTransform
from pyiceberg.typedef import Record
from pyiceberg.types import (
BinaryType,
DateType,
DecimalType,
FixedType,
IntegerType,
LongType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)


Expand Down Expand Up @@ -153,6 +170,29 @@ def test_partition_type(table_schema_simple: Schema) -> None:
)


@pytest.mark.parametrize(
"source_type, value",
[
(IntegerType(), 22),
(LongType(), 22),
(DecimalType(5, 9), Decimal(19.25)),
(DateType(), datetime.date(1925, 5, 22)),
(TimeType(), datetime.time(19, 25, 00)),
(TimestampType(), datetime.datetime(19, 5, 1, 22, 1, 1)),
(TimestamptzType(), datetime.datetime(19, 5, 1, 22, 1, 1, tzinfo=datetime.timezone.utc)),
(StringType(), "abc"),
(UUIDType(), UUID("12345678-1234-5678-1234-567812345678").bytes),
(FixedType(5), 'b"\x8e\xd1\x87\x01"'),
(BinaryType(), b"\x8e\xd1\x87\x01"),
],
)
def test_bucketing_function(source_type: PrimitiveType, value: Any) -> None:
bucket = BucketTransform(2) # type: ignore
import pyarrow as pa

assert bucket.transform(source_type)(value) == bucket.pyarrow_transform(source_type)(pa.array([value])).to_pylist()[0]


def test_deserialize_partition_field_v2() -> None:
json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}"""

Expand Down

0 comments on commit 4cc2c5b

Please sign in to comment.