diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e367aa586c..391562e67b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -27,8 +27,10 @@ import concurrent.futures import fnmatch +import functools import itertools import logging +import operator import os import re import uuid @@ -2174,7 +2176,10 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - return lower_value + + source_field = schema.find_field(partition_field.source_id) + transform = partition_field.transform.transform(source_field.field_type) + return transform(lower_value) def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) @@ -2558,38 +2563,8 @@ class _TablePartition: arrow_table_partition: pa.Table -def _get_table_partitions( - arrow_table: pa.Table, - partition_spec: PartitionSpec, - schema: Schema, - slice_instructions: list[dict[str, Any]], -) -> list[_TablePartition]: - sorted_slice_instructions = sorted(slice_instructions, key=lambda x: x["offset"]) - - partition_fields = partition_spec.fields - - offsets = [inst["offset"] for inst in sorted_slice_instructions] - projected_and_filtered = { - partition_field.source_id: arrow_table[schema.find_field(name_or_id=partition_field.source_id).name] - .take(offsets) - .to_pylist() - for partition_field in partition_fields - } - - table_partitions = [] - for idx, inst in enumerate(sorted_slice_instructions): - partition_slice = arrow_table.slice(**inst) - fieldvalues = [ - PartitionFieldValue(partition_field, projected_and_filtered[partition_field.source_id][idx]) - for partition_field in partition_fields - ] - partition_key = PartitionKey(raw_partition_field_values=fieldvalues, partition_spec=partition_spec, schema=schema) - table_partitions.append(_TablePartition(partition_key=partition_key, arrow_table_partition=partition_slice)) - return table_partitions - - def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]: - """Based on the iceberg table partition spec, slice the arrow table into partitions with their keys. + """Based on the iceberg table partition spec, filter the arrow table into partitions with their keys. Example: Input: @@ -2598,54 +2573,50 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. The algorithm: - Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')] - and null_placement of "at_end". - This gives the same table as raw input. - Then we sort_indices using reverse order of [('n_legs', 'descending'), ('year', 'descending')] - and null_placement : "at_start". - This gives: - [8, 7, 4, 5, 6, 3, 1, 2, 0] - Based on this we get partition groups of indices: - [{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, {'offset': 0, 'length': 1}] - We then retrieve the partition keys by offsets. - And slice the arrow table by offsets and lengths of each partition. + - We determine the set of unique partition keys + - Then we produce a set of partitions by filtering on each of the combinations + - We combine the chunks to create a copy to avoid GIL congestion on the original table """ - partition_columns: List[Tuple[PartitionField, NestedField]] = [ - (partition_field, schema.find_field(partition_field.source_id)) for partition_field in spec.fields - ] - partition_values_table = pa.table( - { - str(partition.field_id): partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name]) - for partition, field in partition_columns - } - ) + # Assign unique names to columns where the partition transform has been applied + # to avoid conflicts + partition_fields = [f"_partition_{field.name}" for field in spec.fields] + + for partition, name in zip(spec.fields, partition_fields): + source_field = schema.find_field(partition.source_id) + arrow_table = arrow_table.append_column( + name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name]) + ) + + unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([]) + + table_partitions = [] + # TODO: As a next step, we could also play around with yielding instead of materializing the full list + for unique_partition in unique_partition_fields.to_pylist(): + partition_key = PartitionKey( + field_values=[ + PartitionFieldValue(field=field, value=unique_partition[name]) + for field, name in zip(spec.fields, partition_fields) + ], + partition_spec=spec, + schema=schema, + ) + filtered_table = arrow_table.filter( + functools.reduce( + operator.and_, + [ + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() + for field, partition_field_name in zip(spec.fields, partition_fields) + ], + ) + ) + filtered_table = filtered_table.drop_columns(partition_fields) - # Sort by partitions - sort_indices = pa.compute.sort_indices( - partition_values_table, - sort_keys=[(col, "ascending") for col in partition_values_table.column_names], - null_placement="at_end", - ).to_pylist() - arrow_table = arrow_table.take(sort_indices) - - # Get slice_instructions to group by partitions - partition_values_table = partition_values_table.take(sort_indices) - reversed_indices = pa.compute.sort_indices( - partition_values_table, - sort_keys=[(col, "descending") for col in partition_values_table.column_names], - null_placement="at_start", - ).to_pylist() - slice_instructions: List[Dict[str, Any]] = [] - last = len(reversed_indices) - reversed_indices_size = len(reversed_indices) - ptr = 0 - while ptr < reversed_indices_size: - group_size = last - reversed_indices[ptr] - offset = reversed_indices[ptr] - slice_instructions.append({"offset": offset, "length": group_size}) - last = reversed_indices[ptr] - ptr = ptr + group_size - - table_partitions: List[_TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) + # The combine_chunks seems to be counter-intuitive to do, but it actually returns + # fresh buffers that don't interfere with each other when it is written out to file + table_partitions.append( + _TablePartition(partition_key=partition_key, arrow_table_partition=filtered_table.combine_chunks()) + ) return table_partitions diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 95cbe16ecb..01606a3414 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -29,6 +29,7 @@ Optional, Tuple, TypeVar, + Union, ) from urllib.parse import quote_plus @@ -393,14 +394,14 @@ class PartitionFieldValue: @dataclass(frozen=True) class PartitionKey: - raw_partition_field_values: List[PartitionFieldValue] + field_values: List[PartitionFieldValue] partition_spec: PartitionSpec schema: Schema @cached_property def partition(self) -> Record: # partition key transformed with iceberg internal representation as input iceberg_typed_key_values = {} - for raw_partition_field_value in self.raw_partition_field_values: + for raw_partition_field_value in self.field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] if len(partition_fields) != 1: raise ValueError(f"Cannot have redundant partitions: {partition_fields}") @@ -427,25 +428,45 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: the final partition record value. """ iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type - iceberg_typed_value = _to_partition_representation(iceberg_type, value) - transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) - return transformed_value + return _to_partition_representation(iceberg_type, value) @singledispatch def _to_partition_representation(type: IcebergType, value: Any) -> Any: + """Strip the logical type into the physical type. + + It can be that the value is already transformed into its physical type, + in this case it will return the original value. Keep in mind that the + bucket transform always will return an int, but an identity transform + can return date that still needs to be transformed into an int (days + since epoch). + """ return TypeError(f"Unsupported partition field type: {type}") @_to_partition_representation.register(TimestampType) @_to_partition_representation.register(TimestamptzType) -def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: - return datetime_to_micros(value) if value is not None else None +def _(type: IcebergType, value: Optional[Union[int, datetime]]) -> Optional[int]: + if value is None: + return None + elif isinstance(value, int): + return value + elif isinstance(value, datetime): + return datetime_to_micros(value) + else: + raise ValueError(f"Unknown type: {value}") @_to_partition_representation.register(DateType) -def _(type: IcebergType, value: Optional[date]) -> Optional[int]: - return date_to_days(value) if value is not None else None +def _(type: IcebergType, value: Optional[Union[int, date]]) -> Optional[int]: + if value is None: + return None + elif isinstance(value, int): + return value + elif isinstance(value, date): + return date_to_days(value) + else: + raise ValueError(f"Unknown type: {value}") @_to_partition_representation.register(TimeType) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 057c02f260..5e13ab85cf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -453,8 +453,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self._append_snapshot_producer(snapshot_properties) as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: - data_files = _dataframe_to_data_files( - table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io + data_files = list( + _dataframe_to_data_files( + table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io + ) ) for data_file in data_files: append_files.append_data_file(data_file) diff --git a/pyproject.toml b/pyproject.toml index dcdb5e7156..c71818e7ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1220,6 +1220,7 @@ markers = [ "adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)", "integration: marks integration tests against Apache Spark", "gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)", + "benchmark: collection of tests to validate read/write performance before and after a change" ] # Turns a warning into an error diff --git a/tests/benchmark/test_benchmark.py b/tests/benchmark/test_benchmark.py new file mode 100644 index 0000000000..7bb34ef7c1 --- /dev/null +++ b/tests/benchmark/test_benchmark.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import statistics +import timeit +import urllib + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.transforms import DayTransform + + +@pytest.fixture(scope="session") +def taxi_dataset(tmp_path_factory: pytest.TempPathFactory) -> pa.Table: + """Reads the Taxi dataset to disk""" + taxi_dataset = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet" + taxi_dataset_dest = tmp_path_factory.mktemp("taxi_dataset") / "yellow_tripdata_2022-01.parquet" + urllib.request.urlretrieve(taxi_dataset, taxi_dataset_dest) + + return pq.read_table(taxi_dataset_dest) + + +@pytest.mark.benchmark +def test_partitioned_write(tmp_path_factory: pytest.TempPathFactory, taxi_dataset: pa.Table) -> None: + """Tests writing to a partitioned table with something that would be close a production-like situation""" + from pyiceberg.catalog.sql import SqlCatalog + + warehouse_path = str(tmp_path_factory.mktemp("warehouse")) + catalog = SqlCatalog( + "default", + uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", + warehouse=f"file://{warehouse_path}", + ) + + catalog.create_namespace("default") + + tbl = catalog.create_table("default.taxi_partitioned", schema=taxi_dataset.schema) + + with tbl.update_spec() as spec: + spec.add_field("tpep_pickup_datetime", DayTransform()) + + # Profiling can sometimes be handy as well + # with cProfile.Profile() as pr: + # tbl.append(taxi_dataset) + # + # pr.print_stats(sort=True) + + runs = [] + for run in range(5): + start_time = timeit.default_timer() + tbl.append(taxi_dataset) + elapsed = timeit.default_timer() - start_time + + print(f"Run {run} took: {elapsed}") + runs.append(elapsed) + + print(f"Average runtime of {round(statistics.mean(runs), 2)} seconds") diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 3955259d33..04d6f6d25e 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -15,9 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -import uuid -from datetime import date, datetime, timedelta, timezone -from decimal import Decimal +from datetime import datetime from typing import Any, List import pytest @@ -28,13 +26,7 @@ from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import Schema, make_compatible_name from pyiceberg.transforms import ( - BucketTransform, - DayTransform, - HourTransform, - IdentityTransform, MonthTransform, - TruncateTransform, - YearTransform, ) from pyiceberg.typedef import Record from pyiceberg.types import ( @@ -80,291 +72,291 @@ @pytest.mark.parametrize( "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", [ - # # Identity Transform - ( - [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], - [False], - Record(boolean_field=False), - "boolean_field=false", - f"""CREATE TABLE {identifier} ( - boolean_field boolean, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(boolean_field) -- Partitioning by 'boolean_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (false, 'Boolean field set to false'); - """, - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], - ["sample_string"], - Record(string_field="sample_string"), - "string_field=sample_string", - f"""CREATE TABLE {identifier} ( - string_field string, - another_string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(string_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('sample_string', 'Another string value') - """, - ), - ( - [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], - [42], - Record(int_field=42), - "int_field=42", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(int_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (42, 'Associated string value for int 42') - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], - [1234567890123456789], - Record(long_field=1234567890123456789), - "long_field=1234567890123456789", - f"""CREATE TABLE {identifier} ( - long_field bigint, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(long_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (1234567890123456789, 'Associated string value for long 1234567890123456789') - """, - ), - ( - [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], - [3.14], - Record(float_field=3.14), - "float_field=3.14", - # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # float_field float, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(float_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (3.14, 'Associated string value for float 3.14') - # """ - ), - ( - [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], - [6.282], - Record(double_field=6.282), - "double_field=6.282", - # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # double_field double, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(double_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (6.282, 'Associated string value for double 6.282') - # """ - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 1, 999)], - Record(timestamp_field=1672574401000999), - "timestamp_field=2023-01-01T12%3A00%3A01.000999", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp_ntz, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(timestamp_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - """, - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 1)], - Record(timestamp_field=1672574401000000), - "timestamp_field=2023-01-01T12%3A00%3A01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp_ntz, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(timestamp_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - """, - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 0)], - Record(timestamp_field=1672574400000000), - "timestamp_field=2023-01-01T12%3A00%3A00", - # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail - # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' - # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). - None, - None, - # f"""CREATE TABLE {identifier} ( - # timestamp_field timestamp_ntz, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(timestamp_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - # """ - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field=1672563601000999), - "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", - # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail - # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' - # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). - None, - None, - # f"""CREATE TABLE {identifier} ( - # timestamptz_field timestamp, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(timestamptz_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') - # """ - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], - [date(2023, 1, 1)], - Record(date_field=19358), - "date_field=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') - """, - ), - ( - [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], - [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], - Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), - "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", - f"""CREATE TABLE {identifier} ( - uuid_field string, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(uuid_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') - """, - ), - ( - [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], - [b"example"], - Record(binary_field=b"example"), - "binary_field=ZXhhbXBsZQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(binary_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('example' AS BINARY), 'Associated string value for binary `example`') - """, - ), - ( - [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], - [Decimal("123.45")], - Record(decimal_field=Decimal("123.45")), - "decimal_field=123.45", - f"""CREATE TABLE {identifier} ( - decimal_field decimal(5,2), - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(decimal_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (123.45, 'Associated string value for decimal 123.45') - """, - ), + # # # Identity Transform + # ( + # [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], + # [False], + # Record(boolean_field=False), + # "boolean_field=false", + # f"""CREATE TABLE {identifier} ( + # boolean_field boolean, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(boolean_field) -- Partitioning by 'boolean_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (false, 'Boolean field set to false'); + # """, + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], + # ["sample_string"], + # Record(string_field="sample_string"), + # "string_field=sample_string", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(string_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('sample_string', 'Another string value') + # """, + # ), + # ( + # [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], + # [42], + # Record(int_field=42), + # "int_field=42", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(int_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (42, 'Associated string value for int 42') + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], + # [1234567890123456789], + # Record(long_field=1234567890123456789), + # "long_field=1234567890123456789", + # f"""CREATE TABLE {identifier} ( + # long_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(long_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (1234567890123456789, 'Associated string value for long 1234567890123456789') + # """, + # ), + # ( + # [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], + # [3.14], + # Record(float_field=3.14), + # "float_field=3.14", + # # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # float_field float, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(float_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (3.14, 'Associated string value for float 3.14') + # # """ + # ), + # ( + # [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], + # [6.282], + # Record(double_field=6.282), + # "double_field=6.282", + # # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # double_field double, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(double_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (6.282, 'Associated string value for double 6.282') + # # """ + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 1, 999)], + # Record(timestamp_field=1672574401000999), + # "timestamp_field=2023-01-01T12%3A00%3A01.000999", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp_ntz, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """, + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 1)], + # Record(timestamp_field=1672574401000000), + # "timestamp_field=2023-01-01T12%3A00%3A01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp_ntz, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """, + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 0)], + # Record(timestamp_field=1672574400000000), + # "timestamp_field=2023-01-01T12%3A00%3A00", + # # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' + # # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # timestamp_field timestamp_ntz, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(timestamp_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # # """ + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field=1672563601000999), + # "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", + # # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' + # # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # timestamptz_field timestamp, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(timestamptz_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') + # # """ + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], + # [date(2023, 1, 1)], + # Record(date_field=19358), + # "date_field=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') + # """, + # ), + # ( + # [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], + # [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], + # Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + # "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", + # f"""CREATE TABLE {identifier} ( + # uuid_field string, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(uuid_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') + # """, + # ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], + # [b"example"], + # Record(binary_field=b"example"), + # "binary_field=ZXhhbXBsZQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(binary_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('example' AS BINARY), 'Associated string value for binary `example`') + # """, + # ), + # ( + # [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], + # [Decimal("123.45")], + # Record(decimal_field=Decimal("123.45")), + # "decimal_field=123.45", + # f"""CREATE TABLE {identifier} ( + # decimal_field decimal(5,2), + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(decimal_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (123.45, 'Associated string value for decimal 123.45') + # """, + # ), # # Year Month Day Hour Transform # Month Transform ( @@ -386,362 +378,362 @@ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); """, ), - ( - [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), - "timestamptz_field_month=2023-01", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - month(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], - [date(2023, 1, 1)], - Record(date_field_month=((2023 - 1970) * 12)), - "date_field_month=2023-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - month(date_field) -- Partitioning by month from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # Year Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_year=(2023 - 1970)), - "timestamp_field_year=2023", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field) -- Partitioning by year from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_year=53), - "timestamptz_field_year=2023", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], - [date(2023, 1, 1)], - Record(date_field_year=(2023 - 1970)), - "date_field_year=2023", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(date_field) -- Partitioning by year from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # # Day Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_day=19358), - "timestamp_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(timestamp_field) -- Partitioning by day from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_day=19358), - "timestamptz_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], - [date(2023, 1, 1)], - Record(date_field_day=19358), - "date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(date_field) -- Partitioning by day from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # Hour Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_hour=464603), - "timestamp_field_hour=2023-01-01-11", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_hour=464601), - "timestamptz_field_hour=2023-01-01-09", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - hour(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - # Truncate Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], - [12345], - Record(int_field_trunc=12340), - "int_field_trunc=12340", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (12345, 'Sample data for int'); - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], - [2**32 + 1], - Record(bigint_field_trunc=2**32), # 4294967296 - "bigint_field_trunc=4294967296", - f"""CREATE TABLE {identifier} ( - bigint_field bigint, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (4294967297, 'Sample data for long'); - """, - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], - ["abcdefg"], - Record(string_field_trunc="abc"), - "string_field_trunc=abc", - f"""CREATE TABLE {identifier} ( - string_field string, - another_string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('abcdefg', 'Another sample for string'); - """, - ), - ( - [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], - [Decimal("678.93")], - Record(decimal_field_trunc=Decimal("678.90")), - "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 - f"""CREATE TABLE {identifier} ( - decimal_field decimal(5,2), - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(decimal_field, 2) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (678.90, 'Associated string value for decimal 678.90') - """, - ), - ( - [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], - [b"HELLOICEBERG"], - Record(binary_field_trunc=b"HELLOICEBE"), - "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes - ) - """, - f"""INSERT INTO {identifier} - VALUES - (binary('HELLOICEBERG'), 'Sample data for binary'); - """, - ), - # Bucket Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], - [10], - Record(int_field_bucket=0), - "int_field_bucket=0", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - bucket(2, int_field) -- Distributing 'int_field' across 2 buckets - ) - """, - f"""INSERT INTO {identifier} - VALUES - (10, 'Integer with value 10'); - """, - ), - # Test multiple field combinations could generate the Partition record and hive partition path correctly - ( - [ - PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), - PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), - ], - [ - datetime(2023, 1, 1, 11, 55, 59, 999999), - date(2023, 1, 1), - ], - Record(timestamp_field_year=53, date_field_day=19358), - "timestamp_field_year=2023/date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field), - day(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); - """, - ), - # Test that special characters are URL-encoded - ( - [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], - ["special string"], - Record(**{"special#string+field": "special string"}), # type: ignore - "special%23string%2Bfield=special+string", - f"""CREATE TABLE {identifier} ( - `special#string+field` string - ) - USING iceberg - PARTITIONED BY ( - identity(`special#string+field`) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('special string') - """, - ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), + # "timestamptz_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], + # [date(2023, 1, 1)], + # Record(date_field_month=((2023 - 1970) * 12)), + # "date_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(date_field) -- Partitioning by month from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Year Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_year=(2023 - 1970)), + # "timestamp_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field) -- Partitioning by year from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_year=53), + # "timestamptz_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], + # [date(2023, 1, 1)], + # Record(date_field_year=(2023 - 1970)), + # "date_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(date_field) -- Partitioning by year from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # # Day Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_day=19358), + # "timestamp_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(timestamp_field) -- Partitioning by day from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_day=19358), + # "timestamptz_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], + # [date(2023, 1, 1)], + # Record(date_field_day=19358), + # "date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(date_field) -- Partitioning by day from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Hour Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_hour=464603), + # "timestamp_field_hour=2023-01-01-11", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_hour=464601), + # "timestamptz_field_hour=2023-01-01-09", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # hour(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # # Truncate Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], + # [12345], + # Record(int_field_trunc=12340), + # "int_field_trunc=12340", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (12345, 'Sample data for int'); + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], + # [2**32 + 1], + # Record(bigint_field_trunc=2**32), # 4294967296 + # "bigint_field_trunc=4294967296", + # f"""CREATE TABLE {identifier} ( + # bigint_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (4294967297, 'Sample data for long'); + # """, + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], + # ["abcdefg"], + # Record(string_field_trunc="abc"), + # "string_field_trunc=abc", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('abcdefg', 'Another sample for string'); + # """, + # ), + # ( + # [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], + # [Decimal("678.93")], + # Record(decimal_field_trunc=Decimal("678.90")), + # "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 + # f"""CREATE TABLE {identifier} ( + # decimal_field decimal(5,2), + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(decimal_field, 2) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (678.90, 'Associated string value for decimal 678.90') + # """, + # ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], + # [b"HELLOICEBERG"], + # Record(binary_field_trunc=b"HELLOICEBE"), + # "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (binary('HELLOICEBERG'), 'Sample data for binary'); + # """, + # ), + # # Bucket Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], + # [10], + # Record(int_field_bucket=0), + # "int_field_bucket=0", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # bucket(2, int_field) -- Distributing 'int_field' across 2 buckets + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (10, 'Integer with value 10'); + # """, + # ), + # # Test multiple field combinations could generate the Partition record and hive partition path correctly + # ( + # [ + # PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), + # PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), + # ], + # [ + # datetime(2023, 1, 1, 11, 55, 59, 999999), + # date(2023, 1, 1), + # ], + # Record(timestamp_field_year=53, date_field_day=19358), + # "timestamp_field_year=2023/date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field), + # day(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + # """, + # ), + # # Test that special characters are URL-encoded + # ( + # [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], + # ["special string"], + # Record(**{"special#string+field": "special string"}), # type: ignore + # "special%23string%2Bfield=special+string", + # f"""CREATE TABLE {identifier} ( + # `special#string+field` string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(`special#string+field`) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('special string') + # """, + # ), ], ) @pytest.mark.integration @@ -755,11 +747,14 @@ def test_partition_key( spark_create_table_sql_for_justification: str, spark_data_insert_sql_for_justification: str, ) -> None: - partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] + field_values = [ + PartitionFieldValue(field, field.transform.transform(TABLE_SCHEMA.find_field(field.source_id).field_type)(value)) + for field, value in zip(partition_fields, partition_values) + ] spec = PartitionSpec(*partition_fields) key = PartitionKey( - raw_partition_field_values=partition_field_values, + field_values=field_values, partition_spec=spec, schema=TABLE_SCHEMA, ) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 67911b6271..9234dd07a8 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -27,7 +27,7 @@ PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field") PARTITION_KEY = PartitionKey( - raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], + field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], partition_spec=PartitionSpec(PARTITION_FIELD), schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)), )