Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Dec 16, 2024
1 parent 5209206 commit e75ad92
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
8 changes: 8 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-ke

Iceberg tables support table properties to configure table behavior.

### Read options

| Key | Options | Default | Description |
|--------------------------------|---------------|--------------------|-----------------------------------------------------------------------------------------------------|
| `read.split.target-size` | Size in bytes | 134217728 (128 MB) | Target size when combining data input splits with `plan_tasks` |
| `read.split.planning-lookback` | Integer | 10 | Number of bins to consider when combining input splits with `plan_tasks` |
| `read.split.open-file-cost` | Integer | 4194304 (4 MB) | The estimated cost to open a file, used as a minimum weight when combining splits with `plan_tasks` |

### Write options

| Key | Options | Default | Description |
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _missing_(cls, value: object) -> Union[None, str]:
return None

def is_splittable(self) -> bool:
return self.name == "AVRO" or self.name == "PARQUET" or self.name == "ORC"
return self == FileFormat.AVRO or self == FileFormat.PARQUET or self == FileFormat.ORC

def __repr__(self) -> str:
"""Return the string representation of the FileFormat class."""
Expand Down
14 changes: 9 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,8 +1240,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:


class ScanTask(ABC):
def size_in_bytes(self) -> int:
raise NotImplementedError("size in bytes is not implemented")
@abstractmethod
def size_in_bytes(self) -> int: ...


@dataclass(init=False)
Expand Down Expand Up @@ -1271,7 +1271,11 @@ def size_in_bytes(self) -> int:

@dataclass(init=False)
class CombinedFileScanTask(ScanTask):
"""Task representing combined multiple file scan tasks."""
"""Task representing combined multiple file scan tasks.
Used in plan_tasks. File can be split into multiple FileScanTask based on
split_offsets and then combined into read.split.target-size.
"""

tasks: List[FileScanTask]

Expand Down Expand Up @@ -1483,12 +1487,12 @@ def plan_task(self) -> Iterable[CombinedFileScanTask]:

def split(task: FileScanTask) -> List[FileScanTask]:
data_file = task.file
if not data_file.split_offsets:
if not data_file.file_format.is_splittable() or not data_file.split_offsets:
return [task]

split_offsets = data_file.split_offsets
if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)):
# split offsets are strictly ascending
# split offsets must be strictly ascending
return [task]

all_tasks = []
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
StringType,
TimestampType,
)
from pyiceberg.utils.bin_packing import PackingIterator
from pyiceberg.utils.concurrent import ExecutorFactory

DEFAULT_PROPERTIES = {"write.parquet.compression-codec": "zstd"}
Expand Down Expand Up @@ -906,8 +907,12 @@ def test_plan_tasks(session_catalog: Catalog) -> None:
)
)

assert len(tbl.inspect.files()) == 1

plan_files = list(tbl.scan().plan_files())
assert len(plan_files) == 1
data_file = plan_files[0].file
assert data_file.split_offsets is not None and len(data_file.split_offsets) == 10

plan_tasks = list(tbl.scan(options={TableProperties.READ_SPLIT_SIZE: 1}).plan_task())
assert len(plan_tasks) == 10
Expand All @@ -918,3 +923,27 @@ def test_plan_tasks(session_catalog: Catalog) -> None:
split_offsets.append(task.tasks[0].start)

assert split_offsets == plan_files[0].file.split_offsets

split_sizes = []
for i in range(1, len(data_file.split_offsets)):
split_sizes.append(data_file.split_offsets[i] - data_file.split_offsets[i - 1])

split_sizes.append(data_file.file_size_in_bytes - data_file.split_offsets[-1])

read_split_size = int(data_file.file_size_in_bytes / 4)
read_split_open_file_cost = 1
read_split_lookback = 5

plan_tasks = list(
tbl.scan(
options={
TableProperties.READ_SPLIT_SIZE: read_split_size,
TableProperties.READ_SPLIT_OPEN_FILE_COST: read_split_open_file_cost,
TableProperties.READ_SPLIT_LOOKBACK: read_split_lookback,
}
).plan_task()
)

assert len(plan_tasks) == len(
list(PackingIterator(split_sizes, read_split_size, read_split_lookback, lambda size: size, False))
)

0 comments on commit e75ad92

Please sign in to comment.