Skip to content

Commit

Permalink
Merge branch 'apache:main' into as-replace-table-as-select
Browse files Browse the repository at this point in the history
  • Loading branch information
anupam-saini authored Apr 2, 2024
2 parents b54b357 + 5ef9f3d commit 044896d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
21 changes: 11 additions & 10 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
import pyiceberg.expressions.visitors as visitors
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand All @@ -56,6 +55,12 @@
EqualTo,
Reference,
)
from pyiceberg.expressions.visitors import (
_InclusiveMetricsEvaluator,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
Expand Down Expand Up @@ -1445,9 +1450,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]

if len(relevant_entries) > 0:
evaluator = visitors._InclusiveMetricsEvaluator(
POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path)
)
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
return {
positional_delete_entry.data_file
for positional_delete_entry in relevant_entries
Expand All @@ -1471,7 +1474,7 @@ def __init__(
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = visitors.inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
return project(self.row_filter)

@cached_property
Expand All @@ -1480,7 +1483,7 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table.specs()[spec_id]
return visitors.manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table.specs()[spec_id]
Expand All @@ -1491,9 +1494,7 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(
data_file.partition
)
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
Expand Down Expand Up @@ -1538,7 +1539,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = visitors._InclusiveMetricsEvaluator(
metrics_evaluator = _InclusiveMetricsEvaluator(
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval

Expand Down
7 changes: 4 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import boto3
import pytest
from moto import mock_aws
from pyspark.sql import SparkSession

from pyiceberg import schema
from pyiceberg.catalog import Catalog, load_catalog
Expand Down Expand Up @@ -86,6 +85,7 @@
if TYPE_CHECKING:
import pyarrow as pa
from moto.server import ThreadedMotoServer # type: ignore
from pyspark.sql import SparkSession

from pyiceberg.io.pyarrow import PyArrowFileIO

Expand Down Expand Up @@ -1954,9 +1954,10 @@ def session_catalog() -> Catalog:


@pytest.fixture(scope="session")
def spark() -> SparkSession:
def spark() -> "SparkSession":
import importlib.metadata
import os

from pyspark.sql import SparkSession

spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
scala_version = "2.12"
Expand Down

0 comments on commit 044896d

Please sign in to comment.