Skip to content

Commit f558e34

Browse files
committed
test: test different rollover scenarios
1 parent 25ec474 commit f558e34

File tree

2 files changed

+89
-237
lines changed

2 files changed

+89
-237
lines changed

pyiceberg/manifest.py

+61-99
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,30 @@
1717
from __future__ import annotations
1818

1919
import math
20-
from abc import ABC
21-
from abc import abstractmethod
20+
from abc import ABC, abstractmethod
2221
from enum import Enum
2322
from types import TracebackType
24-
from typing import Any, Generator
25-
from typing import Callable
26-
from typing import Dict
27-
from typing import Iterator
28-
from typing import List
29-
from typing import Literal
30-
from typing import Optional
31-
from typing import Type
32-
33-
from pyiceberg.avro.file import AvroFile
34-
from pyiceberg.avro.file import AvroOutputFile
23+
from typing import Any, Dict, Generator, Iterator, List, Literal, Optional, Type
24+
25+
from pyiceberg.avro.file import AvroFile, AvroOutputFile
3526
from pyiceberg.conversions import to_bytes
3627
from pyiceberg.exceptions import ValidationError
37-
from pyiceberg.io import FileIO
38-
from pyiceberg.io import InputFile
39-
from pyiceberg.io import OutputFile
28+
from pyiceberg.io import FileIO, InputFile, OutputFile
4029
from pyiceberg.partitioning import PartitionSpec
4130
from pyiceberg.schema import Schema
42-
from pyiceberg.typedef import EMPTY_DICT
43-
from pyiceberg.typedef import Record
44-
from pyiceberg.typedef import TableVersion
45-
from pyiceberg.types import BinaryType
46-
from pyiceberg.types import BooleanType
47-
from pyiceberg.types import IntegerType
48-
from pyiceberg.types import ListType
49-
from pyiceberg.types import LongType
50-
from pyiceberg.types import MapType
51-
from pyiceberg.types import NestedField
52-
from pyiceberg.types import PrimitiveType
53-
from pyiceberg.types import StringType
54-
from pyiceberg.types import StructType
31+
from pyiceberg.typedef import EMPTY_DICT, Record, TableVersion
32+
from pyiceberg.types import (
33+
BinaryType,
34+
BooleanType,
35+
IntegerType,
36+
ListType,
37+
LongType,
38+
MapType,
39+
NestedField,
40+
PrimitiveType,
41+
StringType,
42+
StructType,
43+
)
5544

5645
UNASSIGNED_SEQ = -1
5746
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -101,9 +90,7 @@ def __repr__(self) -> str:
10190

10291
DATA_FILE_TYPE: Dict[int, StructType] = {
10392
1: StructType(
104-
NestedField(
105-
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
106-
),
93+
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
10794
NestedField(
10895
field_id=101,
10996
name="file_format",
@@ -118,9 +105,7 @@ def __repr__(self) -> str:
118105
required=True,
119106
doc="Partition data tuple, schema based on the partition spec",
120107
),
121-
NestedField(
122-
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
123-
),
108+
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
124109
NestedField(
125110
field_id=104,
126111
name="file_size_in_bytes",
@@ -203,9 +188,7 @@ def __repr__(self) -> str:
203188
doc="File format name: avro, orc, or parquet",
204189
initial_default=DataFileContent.DATA,
205190
),
206-
NestedField(
207-
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
208-
),
191+
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
209192
NestedField(
210193
field_id=101,
211194
name="file_format",
@@ -220,9 +203,7 @@ def __repr__(self) -> str:
220203
required=True,
221204
doc="Partition data tuple, schema based on the partition spec",
222205
),
223-
NestedField(
224-
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
225-
),
206+
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
226207
NestedField(
227208
field_id=104,
228209
name="file_size_in_bytes",
@@ -305,34 +286,30 @@ def __repr__(self) -> str:
305286

306287

307288
def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
308-
data_file_partition_type = StructType(
309-
*[
310-
NestedField(
311-
field_id=field.field_id,
312-
name=field.name,
313-
field_type=field.field_type,
314-
required=field.required,
315-
)
316-
for field in partition_type.fields
317-
]
318-
)
289+
data_file_partition_type = StructType(*[
290+
NestedField(
291+
field_id=field.field_id,
292+
name=field.name,
293+
field_type=field.field_type,
294+
required=field.required,
295+
)
296+
for field in partition_type.fields
297+
])
319298

320-
return StructType(
321-
*[
322-
(
323-
NestedField(
324-
field_id=102,
325-
name="partition",
326-
field_type=data_file_partition_type,
327-
required=True,
328-
doc="Partition data tuple, schema based on the partition spec",
329-
)
330-
if field.field_id == 102
331-
else field
299+
return StructType(*[
300+
(
301+
NestedField(
302+
field_id=102,
303+
name="partition",
304+
field_type=data_file_partition_type,
305+
required=True,
306+
doc="Partition data tuple, schema based on the partition spec",
332307
)
333-
for field in DATA_FILE_TYPE[format_version].fields
334-
]
335-
)
308+
if field.field_id == 102
309+
else field
310+
)
311+
for field in DATA_FILE_TYPE[format_version].fields
312+
])
336313

337314

338315
class DataFile(Record):
@@ -413,18 +390,14 @@ def __eq__(self, other: Any) -> bool:
413390
),
414391
}
415392

416-
MANIFEST_ENTRY_SCHEMAS_STRUCT = {
417-
format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()
418-
}
393+
MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}
419394

420395

421396
def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
422-
return Schema(
423-
*[
424-
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
425-
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
426-
]
427-
)
397+
return Schema(*[
398+
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
399+
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
400+
])
428401

429402

430403
class ManifestEntry(Record):
@@ -494,9 +467,7 @@ def update(self, value: Any) -> None:
494467
self._min = min(self._min, value)
495468

496469

497-
def construct_partition_summaries(
498-
spec: PartitionSpec, schema: Schema, partitions: List[Record]
499-
) -> List[PartitionFieldSummary]:
470+
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
500471
types = [field.field_type for field in spec.partition_type(schema).fields]
501472
field_stats = [PartitionFieldStats(field_type) for field_type in types]
502473
for partition_keys in partitions:
@@ -520,9 +491,7 @@ def construct_partition_summaries(
520491
NestedField(512, "added_rows_count", LongType(), required=False),
521492
NestedField(513, "existing_rows_count", LongType(), required=False),
522493
NestedField(514, "deleted_rows_count", LongType(), required=False),
523-
NestedField(
524-
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
525-
),
494+
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
526495
NestedField(519, "key_metadata", BinaryType(), required=False),
527496
),
528497
2: Schema(
@@ -539,16 +508,12 @@ def construct_partition_summaries(
539508
NestedField(512, "added_rows_count", LongType(), required=True),
540509
NestedField(513, "existing_rows_count", LongType(), required=True),
541510
NestedField(514, "deleted_rows_count", LongType(), required=True),
542-
NestedField(
543-
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
544-
),
511+
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
545512
NestedField(519, "key_metadata", BinaryType(), required=False),
546513
),
547514
}
548515

549-
MANIFEST_LIST_FILE_STRUCTS = {
550-
format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()
551-
}
516+
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}
552517

553518

554519
POSITIONAL_DELETE_SCHEMA = Schema(
@@ -667,16 +632,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
667632

668633
# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
669634
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
670-
if entry.data_sequence_number is None and (
671-
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
672-
):
635+
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
673636
entry.data_sequence_number = manifest.sequence_number
674637

675638
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
676639
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
677-
if entry.file_sequence_number is None and (
678-
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
679-
):
640+
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
680641
# Only available in V2, always 0 in V1
681642
entry.file_sequence_number = manifest.sequence_number
682643

@@ -827,7 +788,7 @@ class RollingManifestWriter:
827788
_current_file_rows: int
828789

829790
def __init__(
830-
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows
791+
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes: int, target_number_of_rows: int
831792
) -> None:
832793
self._closed = False
833794
self._manifest_files = []
@@ -838,6 +799,7 @@ def __init__(
838799
self._current_file_rows = 0
839800

840801
def __enter__(self) -> RollingManifestWriter:
802+
"""Open the writer."""
841803
self._get_current_writer().__enter__()
842804
return self
843805

@@ -847,6 +809,7 @@ def __exit__(
847809
exc_value: Optional[BaseException],
848810
traceback: Optional[TracebackType],
849811
) -> None:
812+
"""Close the writer."""
850813
self.closed = True
851814
if self._current_writer:
852815
self._current_writer.__exit__(exc_type, exc_value, traceback)
@@ -869,7 +832,7 @@ def _should_roll_to_new_file(self) -> bool:
869832
or len(self._current_writer._output_file) >= self._target_file_size_in_bytes
870833
)
871834

872-
def _close_current_writer(self):
835+
def _close_current_writer(self) -> None:
873836
if self._current_writer:
874837
self._current_writer.__exit__(None, None, None)
875838
current_file = self._current_writer.to_manifest_file()
@@ -887,6 +850,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
887850
raise RuntimeError("Cannot add entry to closed manifest writer")
888851
self._get_current_writer().add_entry(entry)
889852
self._current_file_rows += entry.data_file.record_count
853+
890854
return self
891855

892856

@@ -1025,9 +989,7 @@ class ManifestListWriterV2(ManifestListWriter):
1025989
_commit_snapshot_id: int
1026990
_sequence_number: int
1027991

1028-
def __init__(
1029-
self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int
1030-
):
992+
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
1031993
super().__init__(
1032994
format_version=2,
1033995
output_file=output_file,

0 commit comments

Comments
 (0)