diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 1c88c7cb3b..93c515aa6c 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -52,6 +52,14 @@ For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-ke Iceberg tables support table properties to configure table behavior. +### Read options + +| Key | Options | Default | Description | +|--------------------------------|---------------|--------------------|-----------------------------------------------------------------------------------------------------| +| `read.split.target-size` | Size in bytes | 134217728 (128 MB) | Target size when combining data input splits with `plan_tasks` | +| `read.split.planning-lookback` | Integer | 10 | Number of bins to consider when combining input splits with `plan_tasks` | +| `read.split.open-file-cost` | Integer | 4194304 (4 MB) | The estimated cost to open a file, used as a minimum weight when combining splits with `plan_tasks` | + ### Write options | Key | Options | Default | Description | diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index e9ada0eb11..dd3e3e83d5 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -106,7 +106,7 @@ def _missing_(cls, value: object) -> Union[None, str]: return None def is_splittable(self) -> bool: - return self.name == "AVRO" or self.name == "PARQUET" or self.name == "ORC" + return self == FileFormat.AVRO or self == FileFormat.PARQUET or self == FileFormat.ORC def __repr__(self) -> str: """Return the string representation of the FileFormat class.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 37476cf72a..5e84a4a206 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1240,8 +1240,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: class ScanTask(ABC): - def size_in_bytes(self) -> int: - raise NotImplementedError("size in bytes is not implemented") + @abstractmethod + def size_in_bytes(self) -> int: ... @dataclass(init=False) @@ -1271,7 +1271,11 @@ def size_in_bytes(self) -> int: @dataclass(init=False) class CombinedFileScanTask(ScanTask): - """Task representing combined multiple file scan tasks.""" + """Task representing combined multiple file scan tasks. + + Used in plan_tasks. File can be split into multiple FileScanTask based on + split_offsets and then combined into read.split.target-size. + """ tasks: List[FileScanTask] @@ -1483,12 +1487,12 @@ def plan_task(self) -> Iterable[CombinedFileScanTask]: def split(task: FileScanTask) -> List[FileScanTask]: data_file = task.file - if not data_file.split_offsets: + if not data_file.file_format.is_splittable() or not data_file.split_offsets: return [task] split_offsets = data_file.split_offsets if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)): - # split offsets are strictly ascending + # split offsets must be strictly ascending return [task] all_tasks = [] diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 4e8d01b5d2..3166c37b51 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -55,6 +55,7 @@ StringType, TimestampType, ) +from pyiceberg.utils.bin_packing import PackingIterator from pyiceberg.utils.concurrent import ExecutorFactory DEFAULT_PROPERTIES = {"write.parquet.compression-codec": "zstd"} @@ -906,8 +907,12 @@ def test_plan_tasks(session_catalog: Catalog) -> None: ) ) + assert len(tbl.inspect.files()) == 1 + plan_files = list(tbl.scan().plan_files()) assert len(plan_files) == 1 + data_file = plan_files[0].file + assert data_file.split_offsets is not None and len(data_file.split_offsets) == 10 plan_tasks = list(tbl.scan(options={TableProperties.READ_SPLIT_SIZE: 1}).plan_task()) assert len(plan_tasks) == 10 @@ -918,3 +923,27 @@ def test_plan_tasks(session_catalog: Catalog) -> None: split_offsets.append(task.tasks[0].start) assert split_offsets == plan_files[0].file.split_offsets + + split_sizes = [] + for i in range(1, len(data_file.split_offsets)): + split_sizes.append(data_file.split_offsets[i] - data_file.split_offsets[i - 1]) + + split_sizes.append(data_file.file_size_in_bytes - data_file.split_offsets[-1]) + + read_split_size = int(data_file.file_size_in_bytes / 4) + read_split_open_file_cost = 1 + read_split_lookback = 5 + + plan_tasks = list( + tbl.scan( + options={ + TableProperties.READ_SPLIT_SIZE: read_split_size, + TableProperties.READ_SPLIT_OPEN_FILE_COST: read_split_open_file_cost, + TableProperties.READ_SPLIT_LOOKBACK: read_split_lookback, + } + ).plan_task() + ) + + assert len(plan_tasks) == len( + list(PackingIterator(split_sizes, read_split_size, read_split_lookback, lambda size: size, False)) + )