120
120
Schema ,
121
121
SchemaVisitorPerPrimitiveType ,
122
122
SchemaWithPartnerVisitor ,
123
+ _check_schema_compatible ,
123
124
pre_order_visit ,
124
125
promote ,
125
126
prune_columns ,
@@ -1397,7 +1398,7 @@ def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array:
1397
1398
# This can be removed once this has been fixed:
1398
1399
# https://github.com/apache/arrow/issues/38809
1399
1400
list_array = pa .LargeListArray .from_arrays (list_array .offsets , value_array )
1400
-
1401
+ value_array = self . _cast_if_needed ( list_type . element_field , value_array )
1401
1402
arrow_field = pa .large_list (self ._construct_field (list_type .element_field , value_array .type ))
1402
1403
return list_array .cast (arrow_field )
1403
1404
else :
@@ -1407,6 +1408,8 @@ def map(
1407
1408
self , map_type : MapType , map_array : Optional [pa .Array ], key_result : Optional [pa .Array ], value_result : Optional [pa .Array ]
1408
1409
) -> Optional [pa .Array ]:
1409
1410
if isinstance (map_array , pa .MapArray ) and key_result is not None and value_result is not None :
1411
+ key_result = self ._cast_if_needed (map_type .key_field , key_result )
1412
+ value_result = self ._cast_if_needed (map_type .value_field , value_result )
1410
1413
arrow_field = pa .map_ (
1411
1414
self ._construct_field (map_type .key_field , key_result .type ),
1412
1415
self ._construct_field (map_type .value_field , value_result .type ),
@@ -1539,9 +1542,16 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc
1539
1542
1540
1543
expected_physical_type = _primitive_to_physical (iceberg_type )
1541
1544
if expected_physical_type != physical_type_string :
1542
- raise ValueError (
1543
- f"Unexpected physical type { physical_type_string } for { iceberg_type } , expected { expected_physical_type } "
1544
- )
1545
+ # Allow promotable physical types
1546
+ # INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
1547
+ if (physical_type_string == "INT32" and expected_physical_type == "INT64" ) or (
1548
+ physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
1549
+ ):
1550
+ pass
1551
+ else :
1552
+ raise ValueError (
1553
+ f"Unexpected physical type { physical_type_string } for { iceberg_type } , expected { expected_physical_type } "
1554
+ )
1545
1555
1546
1556
self .primitive_type = iceberg_type
1547
1557
@@ -1886,16 +1896,6 @@ def data_file_statistics_from_parquet_metadata(
1886
1896
set the mode for column metrics collection
1887
1897
parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
1888
1898
"""
1889
- if parquet_metadata .num_columns != len (stats_columns ):
1890
- raise ValueError (
1891
- f"Number of columns in statistics configuration ({ len (stats_columns )} ) is different from the number of columns in pyarrow table ({ parquet_metadata .num_columns } )"
1892
- )
1893
-
1894
- if parquet_metadata .num_columns != len (parquet_column_mapping ):
1895
- raise ValueError (
1896
- f"Number of columns in column mapping ({ len (parquet_column_mapping )} ) is different from the number of columns in pyarrow table ({ parquet_metadata .num_columns } )"
1897
- )
1898
-
1899
1899
column_sizes : Dict [int , int ] = {}
1900
1900
value_counts : Dict [int , int ] = {}
1901
1901
split_offsets : List [int ] = []
@@ -1988,8 +1988,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
1988
1988
)
1989
1989
1990
1990
def write_parquet (task : WriteTask ) -> DataFile :
1991
- table_schema = task .schema
1992
-
1991
+ table_schema = table_metadata .schema ()
1993
1992
# if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly
1994
1993
# otherwise use the original schema
1995
1994
if (sanitized_schema := sanitize_column_names (table_schema )) != table_schema :
@@ -2001,7 +2000,7 @@ def write_parquet(task: WriteTask) -> DataFile:
2001
2000
batches = [
2002
2001
_to_requested_schema (
2003
2002
requested_schema = file_schema ,
2004
- file_schema = table_schema ,
2003
+ file_schema = task . schema ,
2005
2004
batch = batch ,
2006
2005
downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ,
2007
2006
include_field_ids = True ,
@@ -2060,47 +2059,30 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
2060
2059
return bin_packed_record_batches
2061
2060
2062
2061
2063
- def _check_schema_compatible (table_schema : Schema , other_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False ) -> None :
2062
+ def _check_pyarrow_schema_compatible (
2063
+ requested_schema : Schema , provided_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False
2064
+ ) -> None :
2064
2065
"""
2065
- Check if the `table_schema ` is compatible with `other_schema `.
2066
+ Check if the `requested_schema ` is compatible with `provided_schema `.
2066
2067
2067
2068
Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.
2068
2069
2069
2070
Raises:
2070
2071
ValueError: If the schemas are not compatible.
2071
2072
"""
2072
- name_mapping = table_schema .name_mapping
2073
+ name_mapping = requested_schema .name_mapping
2073
2074
try :
2074
- task_schema = pyarrow_to_schema (
2075
- other_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2075
+ provided_schema = pyarrow_to_schema (
2076
+ provided_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2076
2077
)
2077
2078
except ValueError as e :
2078
- other_schema = _pyarrow_to_schema_without_ids (other_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2079
- additional_names = set (other_schema . column_names ) - set (table_schema . column_names )
2079
+ provided_schema = _pyarrow_to_schema_without_ids (provided_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2080
+ additional_names = set (provided_schema . _name_to_id . keys ()) - set (requested_schema . _name_to_id . keys () )
2080
2081
raise ValueError (
2081
2082
f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
2082
2083
) from e
2083
2084
2084
- if table_schema .as_struct () != task_schema .as_struct ():
2085
- from rich .console import Console
2086
- from rich .table import Table as RichTable
2087
-
2088
- console = Console (record = True )
2089
-
2090
- rich_table = RichTable (show_header = True , header_style = "bold" )
2091
- rich_table .add_column ("" )
2092
- rich_table .add_column ("Table field" )
2093
- rich_table .add_column ("Dataframe field" )
2094
-
2095
- for lhs in table_schema .fields :
2096
- try :
2097
- rhs = task_schema .find_field (lhs .field_id )
2098
- rich_table .add_row ("✅" if lhs == rhs else "❌" , str (lhs ), str (rhs ))
2099
- except ValueError :
2100
- rich_table .add_row ("❌" , str (lhs ), "Missing" )
2101
-
2102
- console .print (rich_table )
2103
- raise ValueError (f"Mismatch in fields:\n { console .export_text ()} " )
2085
+ _check_schema_compatible (requested_schema , provided_schema )
2104
2086
2105
2087
2106
2088
def parquet_files_to_data_files (io : FileIO , table_metadata : TableMetadata , file_paths : Iterator [str ]) -> Iterator [DataFile ]:
@@ -2114,7 +2096,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_
2114
2096
f"Cannot add file { file_path } because it has field IDs. `add_files` only supports addition of files without field_ids"
2115
2097
)
2116
2098
schema = table_metadata .schema ()
2117
- _check_schema_compatible (schema , parquet_metadata .schema .to_arrow_schema ())
2099
+ _check_pyarrow_schema_compatible (schema , parquet_metadata .schema .to_arrow_schema ())
2118
2100
2119
2101
statistics = data_file_statistics_from_parquet_metadata (
2120
2102
parquet_metadata = parquet_metadata ,
@@ -2195,7 +2177,7 @@ def _dataframe_to_data_files(
2195
2177
Returns:
2196
2178
An iterable that supplies datafiles that represent the table.
2197
2179
"""
2198
- from pyiceberg .table import PropertyUtil , TableProperties , WriteTask
2180
+ from pyiceberg .table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE , PropertyUtil , TableProperties , WriteTask
2199
2181
2200
2182
counter = counter or itertools .count (0 )
2201
2183
write_uuid = write_uuid or uuid .uuid4 ()
@@ -2204,13 +2186,16 @@ def _dataframe_to_data_files(
2204
2186
property_name = TableProperties .WRITE_TARGET_FILE_SIZE_BYTES ,
2205
2187
default = TableProperties .WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ,
2206
2188
)
2189
+ name_mapping = table_metadata .schema ().name_mapping
2190
+ downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
2191
+ task_schema = pyarrow_to_schema (df .schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2207
2192
2208
2193
if table_metadata .spec ().is_unpartitioned ():
2209
2194
yield from write_file (
2210
2195
io = io ,
2211
2196
table_metadata = table_metadata ,
2212
2197
tasks = iter ([
2213
- WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = table_metadata . schema () )
2198
+ WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = task_schema )
2214
2199
for batches in bin_pack_arrow_table (df , target_file_size )
2215
2200
]),
2216
2201
)
@@ -2225,7 +2210,7 @@ def _dataframe_to_data_files(
2225
2210
task_id = next (counter ),
2226
2211
record_batches = batches ,
2227
2212
partition_key = partition .partition_key ,
2228
- schema = table_metadata . schema () ,
2213
+ schema = task_schema ,
2229
2214
)
2230
2215
for partition in partitions
2231
2216
for batches in bin_pack_arrow_table (partition .arrow_table_partition , target_file_size )
0 commit comments