-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
URL-encode partition field names in file locations #1457
Changes from all commits
4b139ee
638a43f
3126952
65e2c39
64e7748
18c7674
3756e4e
f5a35de
f1f5f4c
8a106e6
1bb379b
61cdd08
f32b3aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -30,7 +30,7 @@ | |||||||
Tuple, | ||||||||
TypeVar, | ||||||||
) | ||||||||
from urllib.parse import quote | ||||||||
from urllib.parse import quote_plus | ||||||||
|
||||||||
from pydantic import ( | ||||||||
BeforeValidator, | ||||||||
|
@@ -234,9 +234,11 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: | |||||||
partition_field = self.fields[pos] | ||||||||
value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=data[pos]) | ||||||||
|
||||||||
value_str = quote(value_str, safe="") | ||||||||
value_str = quote_plus(value_str, safe="") | ||||||||
value_strs.append(value_str) | ||||||||
field_strs.append(partition_field.name) | ||||||||
|
||||||||
field_str = quote_plus(partition_field.name, safe="") | ||||||||
field_strs.append(field_str) | ||||||||
Comment on lines
+240
to
+241
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, I would just collapse these:
Suggested change
|
||||||||
|
||||||||
path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) | ||||||||
return path | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
import uuid | ||
from datetime import date, datetime, timedelta, timezone | ||
from decimal import Decimal | ||
from typing import Any, List | ||
from typing import Any, Callable, List, Optional | ||
|
||
import pytest | ||
from pyspark.sql import SparkSession | ||
|
@@ -70,14 +70,15 @@ | |
NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), | ||
NestedField(field_id=13, name="decimal_field", field_type=DecimalType(5, 2), required=False), | ||
NestedField(field_id=14, name="uuid_field", field_type=UUIDType(), required=False), | ||
NestedField(field_id=15, name="special#string+field", field_type=StringType(), required=False), | ||
) | ||
|
||
|
||
identifier = "default.test_table" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", | ||
"partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification, make_compatible_name", | ||
[ | ||
# # Identity Transform | ||
( | ||
|
@@ -98,6 +99,7 @@ | |
VALUES | ||
(false, 'Boolean field set to false'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], | ||
|
@@ -117,6 +119,7 @@ | |
VALUES | ||
('sample_string', 'Another string value') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], | ||
|
@@ -136,6 +139,7 @@ | |
VALUES | ||
(42, 'Associated string value for int 42') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], | ||
|
@@ -155,6 +159,7 @@ | |
VALUES | ||
(1234567890123456789, 'Associated string value for long 1234567890123456789') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], | ||
|
@@ -178,6 +183,7 @@ | |
# VALUES | ||
# (3.14, 'Associated string value for float 3.14') | ||
# """ | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], | ||
|
@@ -201,6 +207,7 @@ | |
# VALUES | ||
# (6.282, 'Associated string value for double 6.282') | ||
# """ | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], | ||
|
@@ -220,6 +227,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], | ||
|
@@ -239,6 +247,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], | ||
|
@@ -263,6 +272,7 @@ | |
# VALUES | ||
# (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') | ||
# """ | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], | ||
|
@@ -287,6 +297,7 @@ | |
# VALUES | ||
# (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') | ||
# """ | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], | ||
|
@@ -306,6 +317,7 @@ | |
VALUES | ||
(CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], | ||
|
@@ -325,6 +337,7 @@ | |
VALUES | ||
('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], | ||
|
@@ -344,6 +357,7 @@ | |
VALUES | ||
(CAST('example' AS BINARY), 'Associated string value for binary `example`') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], | ||
|
@@ -363,6 +377,7 @@ | |
VALUES | ||
(123.45, 'Associated string value for decimal 123.45') | ||
""", | ||
None, | ||
), | ||
# # Year Month Day Hour Transform | ||
# Month Transform | ||
|
@@ -384,6 +399,7 @@ | |
VALUES | ||
(CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], | ||
|
@@ -403,6 +419,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], | ||
|
@@ -422,6 +439,7 @@ | |
VALUES | ||
(CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); | ||
""", | ||
None, | ||
), | ||
# Year Transform | ||
( | ||
|
@@ -442,6 +460,7 @@ | |
VALUES | ||
(CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], | ||
|
@@ -461,6 +480,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], | ||
|
@@ -480,6 +500,7 @@ | |
VALUES | ||
(CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); | ||
""", | ||
None, | ||
), | ||
# # Day Transform | ||
( | ||
|
@@ -500,6 +521,7 @@ | |
VALUES | ||
(CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], | ||
|
@@ -519,6 +541,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], | ||
|
@@ -538,6 +561,7 @@ | |
VALUES | ||
(CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); | ||
""", | ||
None, | ||
), | ||
# Hour Transform | ||
( | ||
|
@@ -558,6 +582,7 @@ | |
VALUES | ||
(CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], | ||
|
@@ -577,6 +602,7 @@ | |
VALUES | ||
(CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); | ||
""", | ||
None, | ||
), | ||
# Truncate Transform | ||
( | ||
|
@@ -597,6 +623,7 @@ | |
VALUES | ||
(12345, 'Sample data for int'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], | ||
|
@@ -616,6 +643,7 @@ | |
VALUES | ||
(4294967297, 'Sample data for long'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], | ||
|
@@ -635,6 +663,7 @@ | |
VALUES | ||
('abcdefg', 'Another sample for string'); | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], | ||
|
@@ -654,6 +683,7 @@ | |
VALUES | ||
(678.90, 'Associated string value for decimal 678.90') | ||
""", | ||
None, | ||
), | ||
( | ||
[PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], | ||
|
@@ -673,6 +703,7 @@ | |
VALUES | ||
(binary('HELLOICEBERG'), 'Sample data for binary'); | ||
""", | ||
None, | ||
), | ||
# Bucket Transform | ||
( | ||
|
@@ -693,6 +724,7 @@ | |
VALUES | ||
(10, 'Integer with value 10'); | ||
""", | ||
None, | ||
), | ||
# Test multiple field combinations could generate the Partition record and hive partition path correctly | ||
( | ||
|
@@ -721,6 +753,27 @@ | |
VALUES | ||
(CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); | ||
""", | ||
None, | ||
), | ||
# Test that special characters are URL-encoded | ||
( | ||
[PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], | ||
["special string"], | ||
Record(**{"special#string+field": "special string"}), # type: ignore | ||
"special%23string%2Bfield=special+string", | ||
f"""CREATE TABLE {identifier} ( | ||
`special#string+field` string | ||
) | ||
USING iceberg | ||
PARTITIONED BY ( | ||
identity(`special#string+field`) | ||
) | ||
""", | ||
f"""INSERT INTO {identifier} | ||
VALUES | ||
('special string') | ||
""", | ||
lambda name: name.replace("#", "_x23").replace("+", "_x2B"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we just reuse the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was conflicted about this:
WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my concern was around providing context on "why" this lambda is here and implemented the way it is. This comment refers to this function which is also used in the write path to me, it make sense to use the same function to show that we're doing the same thing as what the underlying system is doing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah thanks, I missed this function completely 😅 . Now it makes more sense 👍 |
||
), | ||
], | ||
) | ||
|
@@ -734,6 +787,7 @@ def test_partition_key( | |
expected_hive_partition_path_slice: str, | ||
spark_create_table_sql_for_justification: str, | ||
spark_data_insert_sql_for_justification: str, | ||
make_compatible_name: Optional[Callable[[str], str]], | ||
) -> None: | ||
partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] | ||
spec = PartitionSpec(*partition_fields) | ||
|
@@ -768,5 +822,12 @@ def test_partition_key( | |
spark_path_for_justification = ( | ||
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path | ||
) | ||
assert spark_partition_for_justification == expected_partition_record | ||
# Special characters in partition value are sanitized when written to the data file's partition field | ||
# Use `make_compatible_name` to match the sanitize behavior | ||
sanitized_record = ( | ||
Record(**{make_compatible_name(k): v for k, v in vars(expected_partition_record).items()}) | ||
if make_compatible_name | ||
else expected_partition_record | ||
) | ||
assert spark_partition_for_justification == sanitized_record | ||
assert expected_hive_partition_path_slice in spark_path_for_justification |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,8 @@ | |
# under the License. | ||
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec | ||
from pyiceberg.schema import Schema | ||
from pyiceberg.transforms import BucketTransform, TruncateTransform | ||
from pyiceberg.transforms import BucketTransform, IdentityTransform, TruncateTransform | ||
from pyiceberg.typedef import Record | ||
from pyiceberg.types import ( | ||
IntegerType, | ||
NestedField, | ||
|
@@ -118,6 +119,27 @@ def test_deserialize_partition_spec() -> None: | |
) | ||
|
||
|
||
def test_partition_spec_to_path() -> None: | ||
schema = Schema( | ||
NestedField(field_id=1, name="str", field_type=StringType(), required=False), | ||
NestedField(field_id=2, name="other_str", field_type=StringType(), required=False), | ||
NestedField(field_id=3, name="int", field_type=IntegerType(), required=True), | ||
) | ||
|
||
spec = PartitionSpec( | ||
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="my#str%bucket"), | ||
PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="other str+bucket"), | ||
PartitionField(source_id=3, field_id=1002, transform=BucketTransform(num_buckets=25), name="my!int:bucket"), | ||
spec_id=3, | ||
) | ||
|
||
record = Record(**{"my#str%bucket": "my+str", "other str+bucket": "( )", "my!int:bucket": 10}) # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mypy complains here and elsewhere but I think it's fine |
||
|
||
# Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java | ||
# behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204 | ||
assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cross-checked with Java implementation (integration tests will do this eventually), in particular WRT to |
||
|
||
|
||
def test_partition_type(table_schema_simple: Schema) -> None: | ||
spec = PartitionSpec( | ||
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It defaults to
utf-8
, so that's good 👍