Skip to content

Commit

Permalink
plan tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Dec 12, 2024
1 parent ede363b commit 5209206
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def _missing_(cls, value: object) -> Union[None, str]:
return member
return None

def is_splittable(self) -> bool:
return self.name == "AVRO" or self.name == "PARQUET" or self.name == "ORC"

def __repr__(self) -> str:
"""Return the string representation of the FileFormat class."""
return f"FileFormat.{self.name}"
Expand Down
92 changes: 90 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@
from pyiceberg.types import (
strtobool,
)
from pyiceberg.utils.bin_packing import ListPacker as ListPacker
from pyiceberg.utils.bin_packing import PackingIterator
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.deprecated import deprecated, deprecation_message
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
import daft
Expand Down Expand Up @@ -191,6 +193,15 @@ class TableProperties:
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE

READ_SPLIT_SIZE = "read.split.target-size"
READ_SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024 # 128 MB

READ_SPLIT_LOOKBACK = "read.split.planning-lookback"
READ_SPLIT_LOOKBACK_DEFAULT = 10

READ_SPLIT_OPEN_FILE_COST = "read.split.open-file-cost"
READ_SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024 # 4 MB

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
Expand Down Expand Up @@ -1229,7 +1240,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:


class ScanTask(ABC):
pass
def size_in_bytes(self) -> int:
raise NotImplementedError("size in bytes is not implemented")


@dataclass(init=False)
Expand All @@ -1253,6 +1265,22 @@ def __init__(
self.start = start or 0
self.length = length or data_file.file_size_in_bytes

def size_in_bytes(self) -> int:
return self.length + sum(f.file_size_in_bytes for f in self.delete_files)


@dataclass(init=False)
class CombinedFileScanTask(ScanTask):
"""Task representing combined multiple file scan tasks."""

tasks: List[FileScanTask]

def __init__(self, tasks: List[FileScanTask]) -> None:
self.tasks = tasks

def size_in_bytes(self) -> int:
return sum(f.size_in_bytes() for f in self.tasks)


def _open_manifest(
io: FileIO,
Expand Down Expand Up @@ -1423,6 +1451,66 @@ def plan_files(self) -> Iterable[FileScanTask]:
for data_entry in data_entries
]

def _target_split_size(self) -> int:
table_value = property_as_int(
self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE, TableProperties.READ_SPLIT_SIZE_DEFAULT
)
return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE, table_value) # type: ignore

def _loop_back(self) -> int:
table_value = property_as_int(
self.table_metadata.properties, TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT
)
return property_as_int(self.options, TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore

def _split_open_file_cost(self) -> int:
table_value = property_as_int(
self.table_metadata.properties,
TableProperties.READ_SPLIT_OPEN_FILE_COST,
TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT,
)
return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore

def plan_task(self) -> Iterable[CombinedFileScanTask]:
"""Plan balanced combined tasks for this scan by splitting large and combining small tasks.
Returns:
List of CombinedFileScanTasks
"""
split_size = self._target_split_size()
loop_back = self._loop_back()
open_file_cost = self._split_open_file_cost()

def split(task: FileScanTask) -> List[FileScanTask]:
data_file = task.file
if not data_file.split_offsets:
return [task]

split_offsets = data_file.split_offsets
if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)):
# split offsets are strictly ascending
return [task]

all_tasks = []
for i in range(len(split_offsets) - 1):
all_tasks.append(
FileScanTask(data_file, task.delete_files, split_offsets[i], split_offsets[i + 1] - split_offsets[i])
)

all_tasks.append(
FileScanTask(data_file, task.delete_files, split_offsets[-1], data_file.file_size_in_bytes - split_offsets[-1])
)

return all_tasks

def weight_func(task: FileScanTask) -> int:
return max(task.size_in_bytes(), (1 + len(task.delete_files)) * open_file_cost)

file_tasks = self.plan_files()
split_file_tasks = list(itertools.chain.from_iterable(map(split, file_tasks)))
packing_iterator = PackingIterator(split_file_tasks, split_size, loop_back, weight_func, False)
return list(map(CombinedFileScanTask, packing_iterator))

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,48 @@ def test_table_scan_empty_table(catalog: Catalog) -> None:
result_table = tbl.scan().to_arrow()

assert len(result_table) == 0


@pytest.mark.integration
def test_plan_tasks(session_catalog: Catalog) -> None:
from pyiceberg.table import TableProperties

table_name = "default.test_plan_tasks"
try:
session_catalog.drop_table(table_name)
except NoSuchTableError:
pass # Just to make sure that the table doesn't exist

tbl = session_catalog.create_table(
table_name,
Schema(
NestedField(1, "number", LongType()),
),
properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
)

# Write 10 row groups, that should end up as 10 batches
entries = 10
tbl.append(
pa.Table.from_pylist(
[
{
"number": number,
}
for number in range(entries)
],
)
)

plan_files = list(tbl.scan().plan_files())
assert len(plan_files) == 1

plan_tasks = list(tbl.scan(options={TableProperties.READ_SPLIT_SIZE: 1}).plan_task())
assert len(plan_tasks) == 10

split_offsets = []
for task in plan_tasks:
assert len(task.tasks) == 1
split_offsets.append(task.tasks[0].start)

assert split_offsets == plan_files[0].file.split_offsets

0 comments on commit 5209206

Please sign in to comment.