Skip to content

Commit 4f45b47

Browse files
committed
test: test different rollover scenarios
1 parent 25ec474 commit 4f45b47

File tree

2 files changed

+97
-237
lines changed

2 files changed

+97
-237
lines changed

pyiceberg/manifest.py

Lines changed: 69 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,38 @@
1717
from __future__ import annotations
1818

1919
import math
20-
from abc import ABC
21-
from abc import abstractmethod
20+
from abc import ABC, abstractmethod
2221
from enum import Enum
2322
from types import TracebackType
24-
from typing import Any, Generator
25-
from typing import Callable
26-
from typing import Dict
27-
from typing import Iterator
28-
from typing import List
29-
from typing import Literal
30-
from typing import Optional
31-
from typing import Type
32-
33-
from pyiceberg.avro.file import AvroFile
34-
from pyiceberg.avro.file import AvroOutputFile
23+
from typing import (
24+
Any,
25+
Dict,
26+
Iterator,
27+
List,
28+
Literal,
29+
Optional,
30+
Type,
31+
)
32+
33+
from pyiceberg.avro.file import AvroFile, AvroOutputFile
3534
from pyiceberg.conversions import to_bytes
3635
from pyiceberg.exceptions import ValidationError
37-
from pyiceberg.io import FileIO
38-
from pyiceberg.io import InputFile
39-
from pyiceberg.io import OutputFile
36+
from pyiceberg.io import FileIO, InputFile, OutputFile
4037
from pyiceberg.partitioning import PartitionSpec
4138
from pyiceberg.schema import Schema
42-
from pyiceberg.typedef import EMPTY_DICT
43-
from pyiceberg.typedef import Record
44-
from pyiceberg.typedef import TableVersion
45-
from pyiceberg.types import BinaryType
46-
from pyiceberg.types import BooleanType
47-
from pyiceberg.types import IntegerType
48-
from pyiceberg.types import ListType
49-
from pyiceberg.types import LongType
50-
from pyiceberg.types import MapType
51-
from pyiceberg.types import NestedField
52-
from pyiceberg.types import PrimitiveType
53-
from pyiceberg.types import StringType
54-
from pyiceberg.types import StructType
39+
from pyiceberg.typedef import EMPTY_DICT, Record, TableVersion
40+
from pyiceberg.types import (
41+
BinaryType,
42+
BooleanType,
43+
IntegerType,
44+
ListType,
45+
LongType,
46+
MapType,
47+
NestedField,
48+
PrimitiveType,
49+
StringType,
50+
StructType,
51+
)
5552

5653
UNASSIGNED_SEQ = -1
5754
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -101,9 +98,7 @@ def __repr__(self) -> str:
10198

10299
DATA_FILE_TYPE: Dict[int, StructType] = {
103100
1: StructType(
104-
NestedField(
105-
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
106-
),
101+
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
107102
NestedField(
108103
field_id=101,
109104
name="file_format",
@@ -118,9 +113,7 @@ def __repr__(self) -> str:
118113
required=True,
119114
doc="Partition data tuple, schema based on the partition spec",
120115
),
121-
NestedField(
122-
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
123-
),
116+
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
124117
NestedField(
125118
field_id=104,
126119
name="file_size_in_bytes",
@@ -203,9 +196,7 @@ def __repr__(self) -> str:
203196
doc="File format name: avro, orc, or parquet",
204197
initial_default=DataFileContent.DATA,
205198
),
206-
NestedField(
207-
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
208-
),
199+
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
209200
NestedField(
210201
field_id=101,
211202
name="file_format",
@@ -220,9 +211,7 @@ def __repr__(self) -> str:
220211
required=True,
221212
doc="Partition data tuple, schema based on the partition spec",
222213
),
223-
NestedField(
224-
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
225-
),
214+
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
226215
NestedField(
227216
field_id=104,
228217
name="file_size_in_bytes",
@@ -305,34 +294,30 @@ def __repr__(self) -> str:
305294

306295

307296
def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
308-
data_file_partition_type = StructType(
309-
*[
310-
NestedField(
311-
field_id=field.field_id,
312-
name=field.name,
313-
field_type=field.field_type,
314-
required=field.required,
315-
)
316-
for field in partition_type.fields
317-
]
318-
)
297+
data_file_partition_type = StructType(*[
298+
NestedField(
299+
field_id=field.field_id,
300+
name=field.name,
301+
field_type=field.field_type,
302+
required=field.required,
303+
)
304+
for field in partition_type.fields
305+
])
319306

320-
return StructType(
321-
*[
322-
(
323-
NestedField(
324-
field_id=102,
325-
name="partition",
326-
field_type=data_file_partition_type,
327-
required=True,
328-
doc="Partition data tuple, schema based on the partition spec",
329-
)
330-
if field.field_id == 102
331-
else field
307+
return StructType(*[
308+
(
309+
NestedField(
310+
field_id=102,
311+
name="partition",
312+
field_type=data_file_partition_type,
313+
required=True,
314+
doc="Partition data tuple, schema based on the partition spec",
332315
)
333-
for field in DATA_FILE_TYPE[format_version].fields
334-
]
335-
)
316+
if field.field_id == 102
317+
else field
318+
)
319+
for field in DATA_FILE_TYPE[format_version].fields
320+
])
336321

337322

338323
class DataFile(Record):
@@ -413,18 +398,14 @@ def __eq__(self, other: Any) -> bool:
413398
),
414399
}
415400

416-
MANIFEST_ENTRY_SCHEMAS_STRUCT = {
417-
format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()
418-
}
401+
MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}
419402

420403

421404
def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
422-
return Schema(
423-
*[
424-
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
425-
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
426-
]
427-
)
405+
return Schema(*[
406+
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
407+
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
408+
])
428409

429410

430411
class ManifestEntry(Record):
@@ -494,9 +475,7 @@ def update(self, value: Any) -> None:
494475
self._min = min(self._min, value)
495476

496477

497-
def construct_partition_summaries(
498-
spec: PartitionSpec, schema: Schema, partitions: List[Record]
499-
) -> List[PartitionFieldSummary]:
478+
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
500479
types = [field.field_type for field in spec.partition_type(schema).fields]
501480
field_stats = [PartitionFieldStats(field_type) for field_type in types]
502481
for partition_keys in partitions:
@@ -520,9 +499,7 @@ def construct_partition_summaries(
520499
NestedField(512, "added_rows_count", LongType(), required=False),
521500
NestedField(513, "existing_rows_count", LongType(), required=False),
522501
NestedField(514, "deleted_rows_count", LongType(), required=False),
523-
NestedField(
524-
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
525-
),
502+
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
526503
NestedField(519, "key_metadata", BinaryType(), required=False),
527504
),
528505
2: Schema(
@@ -539,16 +516,12 @@ def construct_partition_summaries(
539516
NestedField(512, "added_rows_count", LongType(), required=True),
540517
NestedField(513, "existing_rows_count", LongType(), required=True),
541518
NestedField(514, "deleted_rows_count", LongType(), required=True),
542-
NestedField(
543-
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
544-
),
519+
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
545520
NestedField(519, "key_metadata", BinaryType(), required=False),
546521
),
547522
}
548523

549-
MANIFEST_LIST_FILE_STRUCTS = {
550-
format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()
551-
}
524+
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}
552525

553526

554527
POSITIONAL_DELETE_SCHEMA = Schema(
@@ -667,16 +640,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
667640

668641
# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
669642
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
670-
if entry.data_sequence_number is None and (
671-
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
672-
):
643+
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
673644
entry.data_sequence_number = manifest.sequence_number
674645

675646
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
676647
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
677-
if entry.file_sequence_number is None and (
678-
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
679-
):
648+
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
680649
# Only available in V2, always 0 in V1
681650
entry.file_sequence_number = manifest.sequence_number
682651

@@ -827,7 +796,7 @@ class RollingManifestWriter:
827796
_current_file_rows: int
828797

829798
def __init__(
830-
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows
799+
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes: int, target_number_of_rows: int
831800
) -> None:
832801
self._closed = False
833802
self._manifest_files = []
@@ -838,6 +807,7 @@ def __init__(
838807
self._current_file_rows = 0
839808

840809
def __enter__(self) -> RollingManifestWriter:
810+
"""Open the writer."""
841811
self._get_current_writer().__enter__()
842812
return self
843813

@@ -847,6 +817,7 @@ def __exit__(
847817
exc_value: Optional[BaseException],
848818
traceback: Optional[TracebackType],
849819
) -> None:
820+
"""Close the writer."""
850821
self.closed = True
851822
if self._current_writer:
852823
self._current_writer.__exit__(exc_type, exc_value, traceback)
@@ -869,7 +840,7 @@ def _should_roll_to_new_file(self) -> bool:
869840
or len(self._current_writer._output_file) >= self._target_file_size_in_bytes
870841
)
871842

872-
def _close_current_writer(self):
843+
def _close_current_writer(self) -> None:
873844
if self._current_writer:
874845
self._current_writer.__exit__(None, None, None)
875846
current_file = self._current_writer.to_manifest_file()
@@ -887,6 +858,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
887858
raise RuntimeError("Cannot add entry to closed manifest writer")
888859
self._get_current_writer().add_entry(entry)
889860
self._current_file_rows += entry.data_file.record_count
861+
890862
return self
891863

892864

@@ -1025,9 +997,7 @@ class ManifestListWriterV2(ManifestListWriter):
1025997
_commit_snapshot_id: int
1026998
_sequence_number: int
1027999

1028-
def __init__(
1029-
self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int
1030-
):
1000+
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
10311001
super().__init__(
10321002
format_version=2,
10331003
output_file=output_file,

0 commit comments

Comments
 (0)