Skip to content
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8d60727
Revert "Bump Snapshot versions (#1907)"
Fokko Apr 28, 2025
54ae850
Revert "CI: Use Java 1.9.0-SNAPSHOT for testing (#1899)"
Fokko Apr 28, 2025
778260b
Bump to Iceberg 1.9.0
Fokko Apr 28, 2025
2f60f66
WIP
Fokko May 2, 2025
e5d13f9
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko May 3, 2025
cc06390
Cleanup
Fokko May 3, 2025
f2247f1
WIP
Fokko May 12, 2025
f116bab
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko May 13, 2025
cb9414f
WIP
Fokko May 13, 2025
f31dd84
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko May 14, 2025
5f5955a
Delegate Avro parsing to Iceberg-Rust
Fokko May 14, 2025
2749705
Import
Fokko May 14, 2025
fc72d64
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jun 4, 2025
3ea005c
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jun 10, 2025
8bf84fe
Fixes the partition field :)
Fokko Jun 26, 2025
84cb503
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jul 16, 2025
84cfe0a
Poetry
Fokko Jul 26, 2025
219f46b
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jul 26, 2025
b48f8f1
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jul 30, 2025
9b4a8fa
WIP
Fokko Jul 30, 2025
96acdc0
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Jul 30, 2025
dfe097f
WIP
Fokko Jul 31, 2025
543fc56
WIP
Fokko Jul 31, 2025
cdc8d85
WIP
Fokko Jul 31, 2025
974e2e3
Avro: Fix tests and add missing `content` header
Fokko Jul 31, 2025
531e19c
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Aug 7, 2025
68fafeb
WIP
Fokko Aug 7, 2025
137a9de
So clean
Fokko Aug 7, 2025
bb2afab
Cleanup
Fokko Aug 7, 2025
2d0f7dc
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Aug 7, 2025
482c3d5
Fix
Fokko Aug 7, 2025
d1c3a92
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 1, 2025
5777fd4
WIP
Fokko Sep 1, 2025
77d874e
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 17, 2025
41edeb2
Cleanup
Fokko Sep 17, 2025
cc4150b
Cleanup
Fokko Sep 17, 2025
50874da
Bump to 0.7.0rc1
Fokko Sep 18, 2025
23cb193
Bind to Datafusion 48.0.0
Fokko Sep 18, 2025
e31ebda
Fix some tests
Fokko Sep 22, 2025
f6a59ea
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 22, 2025
8b9345a
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 22, 2025
706cee5
Disable zstd for now
Fokko Sep 22, 2025
2b084b7
Fix renames of `tpep_pickup_datetime` →`tpep_pickup_day`
Fokko Sep 22, 2025
be81f2e
Fix test
Fokko Sep 22, 2025
f406558
Fix more tests
Fokko Sep 22, 2025
9a61e63
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 22, 2025
0871c0d
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 23, 2025
eb7bda8
Skip the test for now
Fokko Sep 23, 2025
79ce919
Oops
Fokko Sep 23, 2025
14f9093
Add skip
Fokko Sep 23, 2025
fa8424c
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 23, 2025
3b40383
WIP
Fokko Sep 24, 2025
6a7d88a
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 24, 2025
aad8075
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
Fokko Sep 26, 2025
a68997a
WIP
Fokko Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,015 changes: 1,092 additions & 923 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
manifest_lists_to_delete = set()
manifests_to_delete: List[ManifestFile] = []
for snapshot in metadata.snapshots:
manifests_to_delete += snapshot.manifests(io)
manifests_to_delete += snapshot.manifests(table, io)
manifest_lists_to_delete.add(snapshot.manifest_list)

manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete}
Expand Down
101 changes: 83 additions & 18 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from enum import Enum
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
Expand Down Expand Up @@ -57,6 +58,9 @@
StructType,
)

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
Expand Down Expand Up @@ -704,25 +708,85 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
Returns:
An Iterator of manifest entries.
"""
input_file = io.new_input(self.manifest_path)
with AvroFile[ManifestEntry](
input_file,
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestEntry, 2: DataFile},
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
from pyiceberg_core import manifest

bs = io.new_input(self.manifest_path).open().read()
manifest = manifest.read_manifest_entries(bs)

# TODO: Don't convert the types
# but this is the easiest for now until we
# have the write part in there as well
def _convert_entry(entry: Any) -> ManifestEntry:
data_file = DataFile(
DataFileContent(entry.data_file.content),
entry.data_file.file_path,
# FileFormat(entry.data_file.file_format),
FileFormat.PARQUET,
entry.data_file.partition,
entry.data_file.record_count,
entry.data_file.file_size_in_bytes,
entry.data_file.column_sizes,
entry.data_file.value_counts,
entry.data_file.null_value_counts,
entry.data_file.nan_value_counts,
entry.data_file.lower_bounds,
entry.data_file.upper_bounds,
entry.data_file.key_metadata,
entry.data_file.split_offsets,
entry.data_file.equality_ids,
entry.data_file.sort_order_id,
)

return ManifestEntry(
ManifestEntryStatus(entry.status),
entry.snapshot_id,
entry.sequence_number,
entry.file_sequence_number,
data_file,
)

return [
_inherit_from_manifest(_convert_entry(entry), self)
for entry in manifest.entries()
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list, table: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str, table: "TableMetadata") -> Tuple[ManifestFile, ...]:
"""Read and cache manifests from the given manifest list, returning a tuple to prevent modification."""
file = io.new_input(manifest_list)
return tuple(read_manifest_list(file))
bs = io.new_input(manifest_list).open().read()
from pyiceberg_core import manifest

def partition_spec(spec_id: int) -> str:
spec = table.specs()[spec_id]
partition_type = spec.partition_type(table.schema())
struct = Schema(*partition_type.fields).as_struct()
payload = struct.model_dump_json()
return payload

cb = manifest.PartitionSpecProviderCallbackHolder(partition_spec)

return tuple(
ManifestFile(
manifest.manifest_path,
manifest.manifest_length,
manifest.partition_spec_id,
manifest.content,
manifest.sequence_number,
manifest.min_sequence_number,
manifest.added_snapshot_id,
manifest.added_files_count,
manifest.existing_files_count,
manifest.deleted_files_count,
manifest.added_rows_count,
manifest.existing_rows_count,
manifest.deleted_rows_count,
manifest.partitions,
manifest.key_metadata,
)
for manifest in manifest.read_manifest_list(bs, cb).entries()
)


def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
Expand Down Expand Up @@ -1093,6 +1157,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"format-version": "2",
"content": "data",
},
)
self._commit_snapshot_id = snapshot_id
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,7 @@ def plan_files(self) -> Iterable[FileScanTask]:

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
for manifest_file in snapshot.manifests(self.io, self.table_metadata)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def __str__(self) -> str:
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
return result_str

def manifests(self, io: FileIO) -> List[ManifestFile]:
def manifests(self, io: FileIO, table: TableMetadata) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
return list(_manifests(io, self.manifest_list))
return list(_manifests(io, self.manifest_list, table))


class MetadataLogEntry(IcebergBaseModel):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ psycopg2-binary = { version = ">=2.9.6", optional = true }
sqlalchemy = { version = "^2.0.18", optional = true }
getdaft = { version = ">=0.2.12", optional = true }
cachetools = "^5.5.0"
pyiceberg-core = { version = "^0.4.0", optional = true }
pyiceberg-core = { file = "/Users/fokko.driesprong/work/iceberg-rust/bindings/python/dist/pyiceberg_core-0.4.0-cp39-abi3-macosx_11_0_arm64.whl" }
polars = { version = "^1.21.0", optional = true }
thrift-sasl = { version = ">=0.4.3", optional = true }
kerberos = {version = "^1.3.1", optional = true}
Expand Down
29 changes: 27 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
)
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Accessor, Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1847,15 +1849,38 @@ def simple_map() -> MapType:


@pytest.fixture(scope="session")
def generated_manifest_entry_file(avro_schema_manifest_entry: Dict[str, Any]) -> Generator[str, None, None]:
def test_schema() -> Schema:
return Schema(NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False))


@pytest.fixture(scope="session")
def test_partition_spec() -> Schema:
return PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
PartitionField(2, 1001, IdentityTransform(), "tpep_pickup_datetime"),
)


@pytest.fixture(scope="session")
def generated_manifest_entry_file(
avro_schema_manifest_entry: Dict[str, Any], test_schema: Schema, test_partition_spec: PartitionSpec
) -> Generator[str, None, None]:
from fastavro import parse_schema, writer

parsed_schema = parse_schema(avro_schema_manifest_entry)

with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest.avro"
with open(tmp_avro_file, "wb") as out:
writer(out, parsed_schema, manifest_entry_records)
writer(
out,
parsed_schema,
manifest_entry_records,
metadata={
"schema": test_schema.model_dump_json(),
"partition-spec": test_partition_spec.fields,
},
)
yield tmp_avro_file


Expand Down
5 changes: 1 addition & 4 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_write_empty_manifest() -> None:

@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion, test_schema: Schema
) -> None:
io = load_file_io()
snapshot = Snapshot(
Expand All @@ -370,9 +370,6 @@ def test_write_manifest(
)
demo_manifest_file = snapshot.manifests(io)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
test_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
)
test_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"),
PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"),
Expand Down
Loading