Skip to content

Commit

Permalink
Resolving review comments (#6)
Browse files Browse the repository at this point in the history
* added residual evaluator in plan files

* tested counts with positional deletes

* merged main

* implemented batch reader in count

* breaking integration test

* fixed integration test

* git pull main

* revert

* revert

* revert test_partitioning_key.py

* revert test_parser.py

* added residual evaluator in visitor

* deleted residual_evaluator.py

* removed test count from test_sql.py

* ignored lint type

* fixed lint

* working on plan_files

* type ignored

* make lint

* explicit delete files len is zero

* residual eval only if manifest is true

* default residual is always true

* used projection schema

* refactored residual in plan files
  • Loading branch information
tusharchou authored Jan 31, 2025
1 parent 899beb1 commit 2575cb8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
46 changes: 31 additions & 15 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
)
from pyiceberg.expressions.literals import Literal
from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, L, Record, StructProtocol
from pyiceberg.types import (
Expand Down Expand Up @@ -1734,11 +1734,30 @@ def _can_contain_nans(self, field_id: int) -> bool:


class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
"""Finds the residuals for an Expression the partitions in the given PartitionSpec.
A residual expression is made by partially evaluating an expression using partition values.
For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression
utc_timestamp >= a and utc_timestamp <= b, then there are 4 possible residuals expressions
for the partition data, d:
1. If d > day(a) and d < day(b), the residual is always true
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).
This class is thread-safe.
"""

schema: Schema
spec: PartitionSpec
case_sensitive: bool
expr: BooleanExpression

def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression):
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression) -> None:
self.schema = schema
self.spec = spec
self.case_sensitive = case_sensitive
Expand Down Expand Up @@ -1776,18 +1795,18 @@ def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
return AlwaysFalse()

def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
val = term.eval(self.struct)
if val is None:
return self.visit_true()
# if isnan(term.eval(self.struct)):
if term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return self.visit_false()
return AlwaysFalse()

def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
val = term.eval(self.struct)
if val is not None:
return self.visit_true()
# if not isnan(term.eval(self.struct)):
if not term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return self.visit_false()
return AlwaysFalse()

def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) < literal.value:
Expand Down Expand Up @@ -1866,10 +1885,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpres
if parts == []:
return predicate

from pyiceberg.types import StructType

def struct_to_schema(struct: StructType) -> Schema:
return Schema(*list(struct.fields))
return Schema(*struct.fields)

for part in parts:
strict_projection = part.transform.strict_project(part.name, predicate)
Expand All @@ -1880,6 +1897,7 @@ def struct_to_schema(struct: StructType) -> Schema:
if isinstance(bound, BoundPredicate):
strict_result = super().visit_bound_predicate(bound)
else:
# if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
strict_result = bound

if strict_result is not None and isinstance(strict_result, AlwaysTrue):
Expand Down Expand Up @@ -1926,8 +1944,6 @@ def residual_for(self, partition_data: Record) -> BooleanExpression:
class UnpartitionedResidualEvaluator(ResidualEvaluator):
# Finds the residuals for an Expression the partitions in the given PartitionSpec
def __init__(self, schema: Schema, expr: BooleanExpression):
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC

super().__init__(schema=schema, spec=UNPARTITIONED_PARTITION_SPEC, expr=expr, case_sensitive=False)
self.expr = expr

Expand Down
32 changes: 15 additions & 17 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
Reference,
)
from pyiceberg.expressions.visitors import (
ResidualEvaluator,
_InclusiveMetricsEvaluator,
bind,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
Expand Down Expand Up @@ -1382,13 +1384,13 @@ def __init__(
delete_files: Optional[Set[DataFile]] = None,
start: Optional[int] = None,
length: Optional[int] = None,
residual: Optional[BooleanExpression] = None,
residual: BooleanExpression = ALWAYS_TRUE,
) -> None:
self.file = data_file
self.delete_files = delete_files or set()
self.start = start or 0
self.length = length or data_file.file_size_in_bytes
self.residual = residual # type: ignore
self.residual = residual


def _open_manifest(
Expand All @@ -1397,14 +1399,14 @@ def _open_manifest(
partition_filter: Callable[[DataFile], bool],
residual_evaluator: Callable[[Record], BooleanExpression],
metrics_evaluator: Callable[[DataFile], bool],
) -> List[tuple[ManifestEntry, BooleanExpression]]:
) -> List[ManifestEntry]:
"""Open a manifest file and return matching manifest entries.
Returns:
A list of ManifestEntry that matches the provided filters.
"""
return [
(manifest_entry, residual_evaluator(manifest_entry.data_file.partition))
manifest_entry
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file)
]
Expand Down Expand Up @@ -1471,8 +1473,6 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

from pyiceberg.expressions.visitors import ResidualEvaluator

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

Expand All @@ -1488,7 +1488,7 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
schema=self.projection(),
)
)

Expand Down Expand Up @@ -1522,7 +1522,6 @@ def plan_files(self) -> Iterable[FileScanTask]:
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
from pyiceberg.expressions.visitors import ResidualEvaluator

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

Expand All @@ -1546,11 +1545,11 @@ def plan_files(self) -> Iterable[FileScanTask]:

min_sequence_number = _min_sequence_number(manifests)

data_entries: List[tuple[ManifestEntry, BooleanExpression]] = []
data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry, residual in chain(
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
Expand All @@ -1568,7 +1567,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append((manifest_entry, residual))
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
Expand All @@ -1583,9 +1582,11 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_entry,
positional_delete_entries,
),
residual=residual,
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry, residual in data_entries
for data_entry in data_entries
]

def to_arrow(self) -> pa.Table:
Expand Down Expand Up @@ -1668,19 +1669,16 @@ def count(self) -> int:
# task.residual is a Boolean Expression if the filter condition is fully satisfied by the
# partition value and task.delete_files represents that positional delete haven't been merged yet
# hence those files have to read as a pyarrow table applying the filter and deletes
if task.residual == AlwaysTrue() and not len(task.delete_files):
if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
# Every File has a metadata stat that stores the file record count
res += task.file.record_count
else:
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

arrow_scan = ArrowScan(
table_metadata=self.table_metadata,
io=self.io,
projected_schema=self.projection(),
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
limit=self.limit,
)
if task.file.file_size_in_bytes > 512 * 1024 * 1024:
target_schema = schema_to_pyarrow(self.projection())
Expand Down

0 comments on commit 2575cb8

Please sign in to comment.