Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 368 additions & 0 deletions table/evaluators.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (m *inclusiveMetricsEval) VisitEqual(t iceberg.BoundTerm, lit iceberg.Liter
return rowsMightMatch
}

cmp = getCmpLiteral(upperBound)
if cmp(upperBound, lit) == -1 {
return rowsCannotMatch
}
Expand Down Expand Up @@ -1195,3 +1196,370 @@ func (m *inclusiveMetricsEval) VisitNotStartsWith(t iceberg.BoundTerm, lit icebe

return rowsMightMatch
}

func newStrictMetricsEvaluator(s *iceberg.Schema, expr iceberg.BooleanExpression,
caseSensitive bool, includeEmptyFiles bool,
) (func(iceberg.DataFile) (bool, error), error) {
rewritten, err := iceberg.RewriteNotExpr(expr)
if err != nil {
return nil, err
}

bound, err := iceberg.BindExpr(s, rewritten, caseSensitive)
if err != nil {
return nil, err
}

return (&strictMetricsEval{
st: s.AsStruct(),
includeEmptyFiles: includeEmptyFiles,
expr: bound,
}).Eval, nil
}

type strictMetricsEval struct {
metricsEvaluator

st iceberg.StructType
expr iceberg.BooleanExpression
includeEmptyFiles bool
}

func (m *strictMetricsEval) Eval(file iceberg.DataFile) (bool, error) {
if !m.includeEmptyFiles && file.Count() <= 0 {
return rowsMustMatch, nil
}

// avoid race condition while maintaining existing state
ev := strictMetricsEval{
st: m.st,
includeEmptyFiles: m.includeEmptyFiles,
expr: m.expr,
}

ev.valueCounts, ev.nullCounts = file.ValueCounts(), file.NullValueCounts()
ev.nanCounts = file.NaNValueCounts()
ev.lowerBounds, ev.upperBounds = file.LowerBoundValues(), file.UpperBoundValues()

return iceberg.VisitExpr(m.expr, &ev)
}

func (m *strictMetricsEval) VisitUnbound(iceberg.UnboundPredicate) bool {
panic("need bound predicate")
}

func (m *strictMetricsEval) VisitBound(pred iceberg.BoundPredicate) bool {
return iceberg.VisitBoundPredicate(pred, m)
}

func (m *strictMetricsEval) VisitIsNull(t iceberg.BoundTerm) bool {
fieldID := t.Ref().Field().ID
if m.containsNullsOnly(fieldID) {
return rowsMustMatch
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitNotNull(t iceberg.BoundTerm) bool {
fieldID := t.Ref().Field().ID
if cnt, exists := m.nullCounts[fieldID]; exists && cnt == 0 {
return rowsMustMatch
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitIsNan(t iceberg.BoundTerm) bool {
fieldID := t.Ref().Field().ID

if m.containsNansOnly(fieldID) {
return rowsMustMatch
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitNotNan(t iceberg.BoundTerm) bool {
fieldID := t.Ref().Field().ID

if cnt, exists := m.nanCounts[fieldID]; exists && cnt == 0 {
return rowsMustMatch
}

if m.containsNullsOnly(fieldID) {
return rowsMustMatch
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitLess(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes)
if err != nil {
panic(err)
}

if getCmpLiteral(upperBound)(upperBound, lit) < 0 {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitLessEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes)
if err != nil {
panic(err)
}

if getCmpLiteral(upperBound)(upperBound, lit) <= 0 {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitGreater(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes)
if err != nil {
panic(err)
}

if m.isNan(lowerBound) {
// NaN indicates unreliable bounds.
return rowsMightNotMatch
}

if getCmpLiteral(lowerBound)(lowerBound, lit) > 0 {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitGreaterEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes)
if err != nil {
panic(err)
}

if m.isNan(lowerBound) {
// NaN indicates unreliable bounds.
return rowsMightNotMatch
}

if getCmpLiteral(lowerBound)(lowerBound, lit) >= 0 {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

lowerBytes := m.lowerBounds[fieldID]
upperBytes := m.upperBounds[fieldID]

if lowerBytes != nil && upperBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBytes)
if err != nil {
panic(err)
}
upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBytes)
if err != nil {
panic(err)
}
if getCmpLiteral(lowerBound)(lowerBound, lit) != 0 || getCmpLiteral(upperBound)(upperBound, lit) != 0 {
return rowsMightNotMatch
} else {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitNotEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMustMatch
}

var cmp func(iceberg.Literal, iceberg.Literal) int
if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes)
if err != nil {
panic(err)
}

if m.isNan(lowerBound) {
return rowsMightNotMatch
}

cmp = getCmpLiteral(lowerBound)
if cmp(lowerBound, lit) == 1 {
return rowsMustMatch
}
}

if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes)
if err != nil {
panic(err)
}

if m.isNan(upperBound) {
return rowsMightNotMatch
}

cmp = getCmpLiteral(upperBound)
if cmp(upperBound, lit) == -1 {
return rowsMustMatch
}
Comment on lines +1461 to +1463
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there was no lower bound in the stats, this will panic because cmp will be nil. It'll never get set if there's no lower bound. we should avoid that 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! I think inclusiveMetrics might have the same issue as well https://github.com/apache/iceberg-go/blob/main/table/evaluators.go#L1000

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're right, can you add the fix there too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, actually already fixed it in my new commit

}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitIn(t iceberg.BoundTerm, s iceberg.Set[iceberg.Literal]) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMightNotMatch
}

lowerBytes := m.lowerBounds[fieldID]
upperBytes := m.upperBounds[fieldID]

if lowerBytes != nil && upperBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBytes)
if err != nil {
panic(err)
}
if !s.Contains(lowerBound) {
return rowsMightNotMatch
}

upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBytes)
if err != nil {
panic(err)
}
if !s.Contains(upperBound) {
return rowsMightNotMatch
}

if getCmpLiteral(lowerBound)(lowerBound, upperBound) != 0 {
return rowsMightNotMatch
}

return rowsMustMatch
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitNotIn(t iceberg.BoundTerm, s iceberg.Set[iceberg.Literal]) bool {
field := t.Ref().Field()
fieldID := field.ID

if m.canContainNulls(fieldID) || m.canContainNans(fieldID) {
return rowsMustMatch
}

values := s.Members()
if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes)
if err != nil {
panic(err)
}

if m.isNan(lowerBound) {
return rowsMightNotMatch
}

values = removeBoundCheck(lowerBound, values, 1)
if len(values) == 0 {
return rowsMustMatch
}
}

if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes)
if err != nil {
panic(err)
}

values = removeBoundCheck(upperBound, values, -1)
if len(values) == 0 {
return rowsMustMatch
}
}

return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitStartsWith(iceberg.BoundTerm, iceberg.Literal) bool {
return rowsMightNotMatch
}

func (m *strictMetricsEval) VisitNotStartsWith(iceberg.BoundTerm, iceberg.Literal) bool {
return rowsMightNotMatch
}

func (m *strictMetricsEval) canContainNulls(fieldID int) bool {
cnt, exists := m.nullCounts[fieldID]

return exists && cnt > 0
}

func (m *strictMetricsEval) canContainNans(fieldID int) bool {
cnt, exists := m.nanCounts[fieldID]

return exists && cnt > 0
}
Loading
Loading