Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResidualVisitor to compute residuals #1388

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
731542e
Create test_scan_count.py
tusharchou Nov 28, 2024
c6c971e
moved test_scan_count.py to tests
tusharchou Nov 28, 2024
da18837
implemented count in data scan
tusharchou Nov 28, 2024
3104a2f
tested table scan count in test_sql catalog
tusharchou Nov 28, 2024
c2740ea
refactoring
tusharchou Nov 28, 2024
90bca84
make lint
tusharchou Nov 28, 2024
f7202b9
Merge pull request #1 from tusharchou/gh-1223-count-rows-metadata-onl…
tusharchou Nov 28, 2024
c7205b3
Merge branch 'apache:main' into main
tusharchou Dec 2, 2024
09f9c10
Merge branch 'apache:main' into main
tusharchou Dec 11, 2024
1e9da22
Merge branch 'apache:main' into main
tusharchou Dec 18, 2024
3ab20d4
Merge branch 'apache:main' into main
tusharchou Dec 21, 2024
091c0af
implemeted residual_evaluator.py with tests
tusharchou Dec 24, 2024
3cd797d
added license
tusharchou Dec 24, 2024
6b0924e
fixed lint
tusharchou Dec 24, 2024
96cb4e9
fixed lint errors
tusharchou Dec 24, 2024
212c83b
Merge pull request #3 from tusharchou/gh-1223-metadata-only-row-count
tusharchou Dec 24, 2024
8bc65fa
Merge branch 'apache:main' into main
tusharchou Dec 30, 2024
8bb039f
Gh 1223 metadata only row count (#4)
tusharchou Dec 31, 2024
0019f92
Merge branch 'apache:main' into main
tusharchou Jan 4, 2025
a372a93
Merge branch 'apache:main' into main
tusharchou Jan 6, 2025
ab4c000
Gh 1223 metadata only row count (#5)
tusharchou Jan 6, 2025
f5a871b
Merge branch 'apache:main' into main
tusharchou Jan 18, 2025
8262780
Merge branch 'apache:main' into main
tusharchou Jan 27, 2025
899beb1
Merge branch 'apache:main' into main
tusharchou Jan 31, 2025
2575cb8
Resolving review comments (#6)
tusharchou Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 229 additions & 2 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
)
from pyiceberg.expressions.literals import Literal
from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, L, StructProtocol
from pyiceberg.typedef import EMPTY_DICT, L, Record, StructProtocol
from pyiceberg.types import (
DoubleType,
FloatType,
Expand Down Expand Up @@ -1731,3 +1731,230 @@ def _can_contain_nulls(self, field_id: int) -> bool:

def _can_contain_nans(self, field_id: int) -> bool:
return (nan_count := self.nan_counts.get(field_id)) is not None and nan_count > 0


class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
tusharchou marked this conversation as resolved.
Show resolved Hide resolved
"""Finds the residuals for an Expression the partitions in the given PartitionSpec.

A residual expression is made by partially evaluating an expression using partition values.
For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression
utc_timestamp >= a and utc_timestamp <= b, then there are 4 possible residuals expressions
for the partition data, d:


1. If d > day(a) and d < day(b), the residual is always true
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b

Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).

This class is thread-safe.
"""

schema: Schema
spec: PartitionSpec
case_sensitive: bool
expr: BooleanExpression

tusharchou marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression) -> None:
self.schema = schema
self.spec = spec
self.case_sensitive = case_sensitive
self.expr = expr

def eval(self, partition_data: Record) -> BooleanExpression:
self.struct = partition_data
return visit(self.expr, visitor=self)

def visit_true(self) -> BooleanExpression:
return AlwaysTrue()

def visit_false(self) -> BooleanExpression:
return AlwaysFalse()

def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
return Not(child_result)

def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
return And(left_result, right_result)

def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
return Or(left_result, right_result)

def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
if term.eval(self.struct) is None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
if term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
# if isnan(term.eval(self.struct)):
if term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
# if not isnan(term.eval(self.struct)):
if not term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) < literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) <= literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) > literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) >= literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) == literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) != literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
if term.eval(self.struct) in literals:
return self.visit_true()
else:
return self.visit_false()

def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
if term.eval(self.struct) not in literals:
return self.visit_true()
else:
return self.visit_false()

def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
eval_res = term.eval(self.struct)
if eval_res is not None and str(eval_res).startswith(str(literal.value)):
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if not self.visit_starts_with(term, literal):
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
"""
If there is no strict projection or if it evaluates to false, then return the predicate.

Get the strict projection and inclusive projection of this predicate in partition data,
then use them to determine whether to return the original predicate. The strict projection
returns true iff the original predicate would have returned true, so the predicate can be
eliminated if the strict projection evaluates to true. Similarly the inclusive projection
returns false iff the original predicate would have returned false, so the predicate can
also be eliminated if the inclusive projection evaluates to false.

"""
parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
if parts == []:
return predicate

def struct_to_schema(struct: StructType) -> Schema:
return Schema(*struct.fields)

for part in parts:
strict_projection = part.transform.strict_project(part.name, predicate)
strict_result = None

if strict_projection is not None:
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
if isinstance(bound, BoundPredicate):
strict_result = super().visit_bound_predicate(bound)
else:
# if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
strict_result = bound
tusharchou marked this conversation as resolved.
Show resolved Hide resolved

if strict_result is not None and isinstance(strict_result, AlwaysTrue):
return AlwaysTrue()

inclusive_projection = part.transform.project(part.name, predicate)
inclusive_result = None
if inclusive_projection is not None:
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
if isinstance(bound_inclusive, BoundPredicate):
# using predicate method specific to inclusive
inclusive_result = super().visit_bound_predicate(bound_inclusive)
else:
# if the result is not a predicate, then it must be a constant like alwaysTrue or
# alwaysFalse
inclusive_result = bound_inclusive
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse):
return AlwaysFalse()

return predicate

def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
bound = predicate.bind(self.schema, case_sensitive=True)

if isinstance(bound, BoundPredicate):
bound_residual = self.visit_bound_predicate(predicate=bound)
# if isinstance(bound_residual, BooleanExpression):
if bound_residual not in (AlwaysFalse(), AlwaysTrue()):
# replace inclusive original unbound predicate
return predicate

# use the non-predicate residual (e.g. alwaysTrue)
return bound_residual

# if binding didn't result in a Predicate, return the expression
return bound


class ResidualEvaluator(ResidualVisitor):
def residual_for(self, partition_data: Record) -> BooleanExpression:
return self.eval(partition_data)


class UnpartitionedResidualEvaluator(ResidualEvaluator):
# Finds the residuals for an Expression the partitions in the given PartitionSpec
def __init__(self, schema: Schema, expr: BooleanExpression):
super().__init__(schema=schema, spec=UNPARTITIONED_PARTITION_SPEC, expr=expr, case_sensitive=False)
self.expr = expr

def residual_for(self, partition_data: Record) -> BooleanExpression:
return self.expr


def residual_evaluator_of(
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
) -> ResidualEvaluator:
if len(spec.fields) != 0:
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
else:
return UnpartitionedResidualEvaluator(schema=schema, expr=expr)
69 changes: 69 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
Reference,
)
from pyiceberg.expressions.visitors import (
ResidualEvaluator,
_InclusiveMetricsEvaluator,
bind,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
Expand Down Expand Up @@ -1358,6 +1360,9 @@ def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
return self.update(case_sensitive=case_sensitive)

@abstractmethod
def count(self) -> int: ...


class ScanTask(ABC):
pass
Expand All @@ -1371,24 +1376,28 @@ class FileScanTask(ScanTask):
delete_files: Set[DataFile]
start: int
length: int
residual: BooleanExpression

def __init__(
self,
data_file: DataFile,
delete_files: Optional[Set[DataFile]] = None,
start: Optional[int] = None,
length: Optional[int] = None,
residual: BooleanExpression = ALWAYS_TRUE,
) -> None:
self.file = data_file
self.delete_files = delete_files or set()
self.start = start or 0
self.length = length or data_file.file_size_in_bytes
self.residual = residual


def _open_manifest(
io: FileIO,
manifest: ManifestFile,
partition_filter: Callable[[DataFile], bool],
residual_evaluator: Callable[[Record], BooleanExpression],
metrics_evaluator: Callable[[DataFile], bool],
) -> List[ManifestEntry]:
"""Open a manifest file and return matching manifest entries.
Expand Down Expand Up @@ -1464,6 +1473,25 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
# return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
from pyiceberg.expressions.visitors import residual_evaluator_of

# assert self.row_filter == False
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.projection(),
)
)

def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Expand Down Expand Up @@ -1495,6 +1523,8 @@ def plan_files(self) -> Iterable[FileScanTask]:

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
Expand All @@ -1505,6 +1535,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

metrics_evaluator = _InclusiveMetricsEvaluator(
self.table_metadata.schema(),
self.row_filter,
Expand All @@ -1526,6 +1557,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
residual_evaluators[manifest.partition_spec_id],
metrics_evaluator,
)
for manifest in manifests
Expand All @@ -1550,6 +1582,9 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]
Expand Down Expand Up @@ -1624,6 +1659,40 @@ def to_ray(self) -> ray.data.dataset.Dataset:

return ray.data.from_arrow(self.to_arrow())

def count(self) -> int:
# Usage: Calculates the total number of records in a Scan that haven't had positional deletes.
res = 0
# every task is a FileScanTask
tasks = self.plan_files()
tusharchou marked this conversation as resolved.
Show resolved Hide resolved

for task in tasks:
# task.residual is a Boolean Expression if the filter condition is fully satisfied by the
# partition value and task.delete_files represents that positional delete haven't been merged yet
# hence those files have to read as a pyarrow table applying the filter and deletes
if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
# Every File has a metadata stat that stores the file record count
res += task.file.record_count
else:
tusharchou marked this conversation as resolved.
Show resolved Hide resolved
arrow_scan = ArrowScan(
table_metadata=self.table_metadata,
io=self.io,
projected_schema=self.projection(),
tusharchou marked this conversation as resolved.
Show resolved Hide resolved
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
)
if task.file.file_size_in_bytes > 512 * 1024 * 1024:
target_schema = schema_to_pyarrow(self.projection())
batches = arrow_scan.to_record_batches([task])
from pyarrow import RecordBatchReader

reader = RecordBatchReader.from_batches(target_schema, batches)
for batch in reader:
res += batch.num_rows
else:
tbl = arrow_scan.to_table([task])
res += len(tbl)
return res
tusharchou marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True)
class WriteTask:
Expand Down
Loading