-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API table.scan does not conform to Iceberg spec for identity partition columns #1401
Comments
hi @rkuhlercadent thanks for reporting this issue. can you provide some pseudocode where you're seeing this issue? |
Here is a python script that will demonstrate the issue.
Current output from test script. note null partition_id values.
|
Thanks for providing the test! I added a few print statements import os
import datetime
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, compute_statistics_plan
from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
from pyiceberg.schema import Schema
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.table import TableProperties
from pyiceberg.typedef import Record
from pyiceberg.types import StringType, IntegerType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table.name_mapping import create_mapping_from_schema
import pyarrow as pa
import pyarrow.parquet as pq
def demonstrate_identity_partition_scan_issue():
# we have petabytes of parquet data in hive format on s3 already that we are cataloging in iceberg format.
# note that these parquet files do NOT have the partition columns in them which is standard for hive format.
# the partition values must be taken from the iceberg metadata for the identity partition columns as
# specified in the iceberg spec: https://iceberg.apache.org/spec/#column-projection
# "Values for field ids which are not present in a data file must be resolved according the following rules:
# Return the value from partition metadata if an Identity Transform exists for the field and the partition
# value is present in the partition struct on data_file object in the manifest. This allows for metadata
# only migrations of Hive tables."
warehouse_path = os.path.dirname(os.path.realpath(__file__))
namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
catalog = get_iceberg_catalog(warehouse_path)
drop_catalog_entities_for_test(catalog, namespace_name)
# create sample hive files
sample_hive_parquet_file = create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 202412)
# catalog existing hive data in iceberg
catalog.create_namespace(namespace_name)
table = create_iceberg_table(catalog, namespace_name, table_name)
print("Hive parquet data:\n", pq.read_table(sample_hive_parquet_file.get("location")))
print()
add_data_file(table, sample_hive_parquet_file, table.metadata.default_spec_id)
# the partition_id columns should have values from the metadata not null in this output
# this same iceberg metadata correctly returns the partition_id column values in spark, athena, and snowflake
print("Table partitions:\n", table.inspect.partitions().to_pandas())
print()
print("Table scan:\n", table.scan().to_arrow())
print()
def get_iceberg_catalog(warehouse_path):
# using sqlite catalog on local filesystem for demo
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
})
return catalog
def drop_catalog_entities_for_test(catalog, namespace_name):
if namespace_name in [n[0] for n in catalog.list_namespaces()]:
for _, table_name in catalog.list_tables(namespace_name):
catalog.drop_table(f"{namespace_name}.{table_name}")
catalog.drop_namespace(namespace_name)
def create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, partition_id):
location = f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
os.makedirs(os.path.dirname(location), exist_ok=True)
name = datetime.datetime.strptime(str(partition_id), "%Y%m").strftime("%B %Y")
names = pa.array([name], type=pa.string())
pq.write_table(pa.table([names], names=["name"]), location)
return {
"location": location,
"file_size": os.path.getsize(location),
"partition_id": partition_id
}
def create_iceberg_table(catalog, namespace_name, table_name):
print("creating iceberg table")
schema = Schema(
NestedField(field_id=1, name="partition_id", field_type=IntegerType(), required=False),
NestedField(field_id=2, name="name", field_type=StringType(), required=False))
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_id"))
table = catalog.create_table(
f"{namespace_name}.{table_name}",
schema=schema,
partition_spec=partition_spec,
properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()})
return table
def add_data_file(table, hive_data_file, spec_id):
print("adding data file")
parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
stats_columns = compute_statistics_plan(table.schema(), table.metadata.properties)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=stats_columns,
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
data_file = DataFile(
content=DataFileContent.DATA,
file_path=hive_data_file.get("location"),
file_format=FileFormat.PARQUET,
partition=Record(partition_id=hive_data_file.get("partition_id")),
file_size_in_bytes=hive_data_file.get("file_size"),
sort_order_id=None,
spec_id=spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict())
with table.transaction() as tx:
with tx.update_snapshot().overwrite() as update_snapshot:
update_snapshot.append_data_file(data_file)
if __name__ == "__main__":
demonstrate_identity_partition_scan_issue() And heres the output
The issue is the IdentityTransform partition column This issue also occurs for the |
Heres the code path for arrow table scan:
iceberg-python/pyiceberg/table/__init__.py Lines 1436 to 1438 in bfc0d9a
iceberg-python/pyiceberg/io/pyarrow.py Lines 1416 to 1476 in bfc0d9a
we likely need to post-process according to the column projection rules. |
@rkuhlercadent thanks a bunch for reporting this issue! Would you like to help contribute this fix? |
If its available I would want to give it a go! |
@gabeiglio assigned to you since I realized OP checked "I cannot contribute a fix for this bug at this time" |
Im open for feedback but as I investigated this issue im inclined that the fix would need to be in _task_to_record_batches. By comparing the projected schema vs the file projection schema we could:
Im still figuring out how to do step three (and if its possible), apologies for the speed but Ive been a bit short on time. @kevinjqliu does it make sense? This is my first contribution to the project so I might be missing more context |
That makes sense to me. I think we generally need a place to replicate the column projection logic according to the spec. iceberg-python/pyiceberg/io/pyarrow.py Line 1246 in a97d13c
yea the issue occurs when the table schema has fields that are not present in the file schema.
We don't need this extra check since the table/file schema mismatch will tell us which columns are missing. Also we'd always want to check by field id
Yea we'd want to append whatever the value is to the data file records. Luckily arrow is columnar so there wont be much penalty. |
Apache Iceberg version
0.8.0 (latest release)
Please describe the bug 🐞
Per the Iceberg spec, partition columns with identity transforms should get their values from the metadata if not present in the data file. However, table.scan returns null values instead.
https://iceberg.apache.org/spec/#column-projection
"Values for field ids which are not present in a data file must be resolved according the following rules:
Return the value from partition metadata if an Identity Transform exists for the field and the partition value is present in the partition struct on data_file object in the manifest. This allows for metadata only migrations of Hive tables."
Willingness to contribute
The text was updated successfully, but these errors were encountered: