diff --git a/table/evaluators.go b/table/evaluators.go index 22b918db9..fd751632d 100644 --- a/table/evaluators.go +++ b/table/evaluators.go @@ -997,6 +997,7 @@ func (m *inclusiveMetricsEval) VisitEqual(t iceberg.BoundTerm, lit iceberg.Liter return rowsMightMatch } + cmp = getCmpLiteral(upperBound) if cmp(upperBound, lit) == -1 { return rowsCannotMatch } @@ -1195,3 +1196,370 @@ func (m *inclusiveMetricsEval) VisitNotStartsWith(t iceberg.BoundTerm, lit icebe return rowsMightMatch } + +func newStrictMetricsEvaluator(s *iceberg.Schema, expr iceberg.BooleanExpression, + caseSensitive bool, includeEmptyFiles bool, +) (func(iceberg.DataFile) (bool, error), error) { + rewritten, err := iceberg.RewriteNotExpr(expr) + if err != nil { + return nil, err + } + + bound, err := iceberg.BindExpr(s, rewritten, caseSensitive) + if err != nil { + return nil, err + } + + return (&strictMetricsEval{ + st: s.AsStruct(), + includeEmptyFiles: includeEmptyFiles, + expr: bound, + }).Eval, nil +} + +type strictMetricsEval struct { + metricsEvaluator + + st iceberg.StructType + expr iceberg.BooleanExpression + includeEmptyFiles bool +} + +func (m *strictMetricsEval) Eval(file iceberg.DataFile) (bool, error) { + if !m.includeEmptyFiles && file.Count() <= 0 { + return rowsMustMatch, nil + } + + // avoid race condition while maintaining existing state + ev := strictMetricsEval{ + st: m.st, + includeEmptyFiles: m.includeEmptyFiles, + expr: m.expr, + } + + ev.valueCounts, ev.nullCounts = file.ValueCounts(), file.NullValueCounts() + ev.nanCounts = file.NaNValueCounts() + ev.lowerBounds, ev.upperBounds = file.LowerBoundValues(), file.UpperBoundValues() + + return iceberg.VisitExpr(m.expr, &ev) +} + +func (m *strictMetricsEval) VisitUnbound(iceberg.UnboundPredicate) bool { + panic("need bound predicate") +} + +func (m *strictMetricsEval) VisitBound(pred iceberg.BoundPredicate) bool { + return iceberg.VisitBoundPredicate(pred, m) +} + +func (m *strictMetricsEval) VisitIsNull(t iceberg.BoundTerm) bool { + fieldID := t.Ref().Field().ID + if m.containsNullsOnly(fieldID) { + return rowsMustMatch + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitNotNull(t iceberg.BoundTerm) bool { + fieldID := t.Ref().Field().ID + if cnt, exists := m.nullCounts[fieldID]; exists && cnt == 0 { + return rowsMustMatch + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitIsNan(t iceberg.BoundTerm) bool { + fieldID := t.Ref().Field().ID + + if m.containsNansOnly(fieldID) { + return rowsMustMatch + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitNotNan(t iceberg.BoundTerm) bool { + fieldID := t.Ref().Field().ID + + if cnt, exists := m.nanCounts[fieldID]; exists && cnt == 0 { + return rowsMustMatch + } + + if m.containsNullsOnly(fieldID) { + return rowsMustMatch + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitLess(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) + if err != nil { + panic(err) + } + + if getCmpLiteral(upperBound)(upperBound, lit) < 0 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitLessEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) + if err != nil { + panic(err) + } + + if getCmpLiteral(upperBound)(upperBound, lit) <= 0 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitGreater(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) + if err != nil { + panic(err) + } + + if m.isNan(lowerBound) { + // NaN indicates unreliable bounds. + return rowsMightNotMatch + } + + if getCmpLiteral(lowerBound)(lowerBound, lit) > 0 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitGreaterEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) + if err != nil { + panic(err) + } + + if m.isNan(lowerBound) { + // NaN indicates unreliable bounds. + return rowsMightNotMatch + } + + if getCmpLiteral(lowerBound)(lowerBound, lit) >= 0 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + lowerBytes := m.lowerBounds[fieldID] + upperBytes := m.upperBounds[fieldID] + + if lowerBytes != nil && upperBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBytes) + if err != nil { + panic(err) + } + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBytes) + if err != nil { + panic(err) + } + if getCmpLiteral(lowerBound)(lowerBound, lit) != 0 || getCmpLiteral(upperBound)(upperBound, lit) != 0 { + return rowsMightNotMatch + } else { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitNotEqual(t iceberg.BoundTerm, lit iceberg.Literal) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMustMatch + } + + var cmp func(iceberg.Literal, iceberg.Literal) int + if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) + if err != nil { + panic(err) + } + + if m.isNan(lowerBound) { + return rowsMightNotMatch + } + + cmp = getCmpLiteral(lowerBound) + if cmp(lowerBound, lit) == 1 { + return rowsMustMatch + } + } + + if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) + if err != nil { + panic(err) + } + + if m.isNan(upperBound) { + return rowsMightNotMatch + } + + cmp = getCmpLiteral(upperBound) + if cmp(upperBound, lit) == -1 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitIn(t iceberg.BoundTerm, s iceberg.Set[iceberg.Literal]) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMightNotMatch + } + + lowerBytes := m.lowerBounds[fieldID] + upperBytes := m.upperBounds[fieldID] + + if lowerBytes != nil && upperBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBytes) + if err != nil { + panic(err) + } + if !s.Contains(lowerBound) { + return rowsMightNotMatch + } + + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBytes) + if err != nil { + panic(err) + } + if !s.Contains(upperBound) { + return rowsMightNotMatch + } + + if getCmpLiteral(lowerBound)(lowerBound, upperBound) != 0 { + return rowsMightNotMatch + } + + return rowsMustMatch + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitNotIn(t iceberg.BoundTerm, s iceberg.Set[iceberg.Literal]) bool { + field := t.Ref().Field() + fieldID := field.ID + + if m.canContainNulls(fieldID) || m.canContainNans(fieldID) { + return rowsMustMatch + } + + values := s.Members() + if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil { + lowerBound, err := iceberg.LiteralFromBytes(field.Type, lowerBoundBytes) + if err != nil { + panic(err) + } + + if m.isNan(lowerBound) { + return rowsMightNotMatch + } + + values = removeBoundCheck(lowerBound, values, 1) + if len(values) == 0 { + return rowsMustMatch + } + } + + if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil { + upperBound, err := iceberg.LiteralFromBytes(field.Type, upperBoundBytes) + if err != nil { + panic(err) + } + + values = removeBoundCheck(upperBound, values, -1) + if len(values) == 0 { + return rowsMustMatch + } + } + + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitStartsWith(iceberg.BoundTerm, iceberg.Literal) bool { + return rowsMightNotMatch +} + +func (m *strictMetricsEval) VisitNotStartsWith(iceberg.BoundTerm, iceberg.Literal) bool { + return rowsMightNotMatch +} + +func (m *strictMetricsEval) canContainNulls(fieldID int) bool { + cnt, exists := m.nullCounts[fieldID] + + return exists && cnt > 0 +} + +func (m *strictMetricsEval) canContainNans(fieldID int) bool { + cnt, exists := m.nanCounts[fieldID] + + return exists && cnt > 0 +} diff --git a/table/evaluators_test.go b/table/evaluators_test.go index c363f3862..d6cfe06b0 100644 --- a/table/evaluators_test.go +++ b/table/evaluators_test.go @@ -2023,7 +2023,703 @@ func (suite *InclusiveMetricsTestSuite) TestNotStartsWith() { } } +type StrictMetricsTestSuite struct { + suite.Suite + + schemaDataFile *iceberg.Schema + dataFiles [3]iceberg.DataFile + + schemaDataFileNan *iceberg.Schema + dataFileNan iceberg.DataFile +} + +func (suite *StrictMetricsTestSuite) SetupSuite() { + suite.schemaDataFile = iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 2, Name: "no_stats", Type: iceberg.PrimitiveTypes.Int32, Required: false}, + iceberg.NestedField{ID: 3, Name: "required", Type: iceberg.PrimitiveTypes.String, Required: true}, + iceberg.NestedField{ID: 4, Name: "all_nulls", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 5, Name: "some_nulls", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 6, Name: "no_nulls", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 7, Name: "always_5", Type: iceberg.PrimitiveTypes.Int32}, + iceberg.NestedField{ID: 8, Name: "all_nans", Type: iceberg.PrimitiveTypes.Float64}, + iceberg.NestedField{ID: 9, Name: "some_nans", Type: iceberg.PrimitiveTypes.Float32}, + iceberg.NestedField{ID: 10, Name: "no_nans", Type: iceberg.PrimitiveTypes.Float32}, + iceberg.NestedField{ID: 11, Name: "all_nulls_double", Type: iceberg.PrimitiveTypes.Float64}, + iceberg.NestedField{ID: 12, Name: "all_nans_v1_stats", Type: iceberg.PrimitiveTypes.Float32}, + iceberg.NestedField{ID: 13, Name: "nan_and_null_only", Type: iceberg.PrimitiveTypes.Float64}, + iceberg.NestedField{ID: 14, Name: "no_nan_stats", Type: iceberg.PrimitiveTypes.Float64}, + ) + + var ( + IntMin, _ = iceberg.Int32Literal(IntMinValue).MarshalBinary() + IntMax, _ = iceberg.Int32Literal(IntMaxValue).MarshalBinary() + IntFive, _ = iceberg.Int32Literal(5).MarshalBinary() + FltNan, _ = iceberg.Float32Literal(float32(math.NaN())).MarshalBinary() + DblNan, _ = iceberg.Float64Literal(math.NaN()).MarshalBinary() + FltSeven, _ = iceberg.Float32Literal(7).MarshalBinary() + DblSeven, _ = iceberg.Float64Literal(7).MarshalBinary() + FltMax, _ = iceberg.Float32Literal(22).MarshalBinary() + ) + + suite.dataFiles = [3]iceberg.DataFile{ + &mockDataFile{ + path: "file_1.parquet", + format: iceberg.ParquetFile, + count: 50, + filesize: 3, + valueCounts: map[int]int64{ + 4: 50, 5: 50, 6: 50, 8: 50, 9: 50, + 10: 50, 11: 50, 12: 50, 13: 50, 14: 50, + }, + nullCounts: map[int]int64{4: 50, 5: 10, 6: 0, 11: 50, 12: 0, 13: 1}, + nanCounts: map[int]int64{8: 50, 9: 10, 10: 0}, + lowerBounds: map[int][]byte{ + 1: IntMin, + 7: IntFive, + 12: FltNan, + 13: DblNan, + }, + upperBounds: map[int][]byte{ + 1: IntMax, + 7: IntFive, + 12: FltNan, + 14: DblNan, + }, + }, + &mockDataFile{ + path: "file_2.parquet", + format: iceberg.ParquetFile, + count: 50, + filesize: 3, + valueCounts: map[int]int64{4: 50, 5: 50, 6: 50, 8: 50}, + nullCounts: map[int]int64{4: 50, 5: 10, 6: 0}, + nanCounts: nil, + lowerBounds: map[int][]byte{5: {'b', 'b', 'b'}}, + upperBounds: map[int][]byte{5: {'e', 'e', 'e'}}, + }, + &mockDataFile{ + path: "file_3.parquet", + format: iceberg.ParquetFile, + count: 50, + filesize: 3, + valueCounts: map[int]int64{4: 50, 5: 50, 6: 50}, + nullCounts: map[int]int64{4: 50, 5: 10, 6: 0}, + nanCounts: nil, + lowerBounds: map[int][]byte{5: {'b', 'b', 'b'}}, + upperBounds: map[int][]byte{5: {'e', 'e', 'e'}}, + }, + } + + suite.schemaDataFileNan = iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "all_nan", Type: iceberg.PrimitiveTypes.Float64, Required: true}, + iceberg.NestedField{ID: 2, Name: "max_nan", Type: iceberg.PrimitiveTypes.Float64, Required: true}, + iceberg.NestedField{ID: 3, Name: "min_max_nan", Type: iceberg.PrimitiveTypes.Float32}, + iceberg.NestedField{ID: 4, Name: "all_nan_null_bounds", Type: iceberg.PrimitiveTypes.Float64, Required: true}, + iceberg.NestedField{ID: 5, Name: "some_nan_correct_bounds", Type: iceberg.PrimitiveTypes.Float32}, + ) + + suite.dataFileNan = &mockDataFile{ + path: "file.avro", + format: iceberg.AvroFile, + count: 50, + filesize: 3, + columnSizes: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10}, + valueCounts: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10}, + nullCounts: map[int]int64{1: 0, 2: 0, 3: 0, 4: 0, 5: 0}, + nanCounts: map[int]int64{1: 10, 4: 10, 5: 5}, + lowerBounds: map[int][]byte{ + 1: DblNan, + 2: DblSeven, + 3: FltNan, + 5: FltSeven, + }, + upperBounds: map[int][]byte{ + 1: DblNan, + 2: DblNan, + 3: FltNan, + 5: FltMax, + }, + } +} + +func (suite *StrictMetricsTestSuite) TestAllNull() { + allNull, someNull, noNull := iceberg.Reference("all_nulls"), iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls") + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NotNull(allNull), false, "should skip: no non-null value in all null column"}, + {iceberg.NotNull(someNull), false, "should skip: column with some nulls contains a non-null value"}, + {iceberg.NotNull(noNull), true, "should read: non-null column contains no null values"}, + {iceberg.NotEqualTo(allNull, "a"), true, "should read: notEqual on all nulls column"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestNoNulls() { + allNull, someNull, noNull := iceberg.Reference("all_nulls"), iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls") + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.IsNull(allNull), true, "should read: all values are null"}, + {iceberg.IsNull(someNull), false, "should skip: not all values are null"}, + {iceberg.IsNull(noNull), false, "should skip: no values are null"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestSomeNulls() { + someNull := iceberg.Reference("some_nulls") + + tests := []struct { + expr iceberg.BooleanExpression + dataFile int + expected bool + msg string + }{ + {iceberg.LessThan(someNull, "ggg"), 1, false, "should skip: some values are null"}, + {iceberg.LessThanEqual(someNull, "ggg"), 1, false, "should skip: some values are null"}, + {iceberg.GreaterThan(someNull, "aaa"), 1, false, "should skip: some values are null"}, + {iceberg.GreaterThanEqual(someNull, "bbb"), 1, false, "should skip: some values are null"}, + {iceberg.EqualTo(someNull, "bbb"), 2, false, "should skip: some values are null"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[tt.dataFile]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIsNan() { + allNan, someNan, noNan := iceberg.Reference("all_nans"), iceberg.Reference("some_nans"), iceberg.Reference("no_nans") + allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"), iceberg.Reference("no_nan_stats") + allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"), iceberg.Reference("nan_and_null_only") + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.IsNaN(allNan), true, "should read: all values are nan"}, + {iceberg.IsNaN(someNan), false, "should skip: at least one non-nan value in some nan column"}, + {iceberg.IsNaN(noNan), false, "should skip: at least one non-nan value in no nan column"}, + {iceberg.IsNaN(allNullsDbl), false, "should skip: at least one non-nan value in all null column"}, + {iceberg.IsNaN(noNanStats), false, "should skip: cannot determine without nan stats"}, + {iceberg.IsNaN(allNansV1), false, "should skip: cannot determine without nan stats"}, + {iceberg.IsNaN(nanNullOnly), false, "should skip: null values are not nan"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestNotNaN() { + allNan, someNan, noNan := iceberg.Reference("all_nans"), iceberg.Reference("some_nans"), iceberg.Reference("no_nans") + allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"), iceberg.Reference("no_nan_stats") + allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"), iceberg.Reference("nan_and_null_only") + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NotNaN(allNan), false, "should skip: all values are nan"}, + {iceberg.NotNaN(someNan), false, "should skip: at least one nan value in some nan column"}, + {iceberg.NotNaN(noNan), true, "should read: no value is nan"}, + {iceberg.NotNaN(allNullsDbl), true, "should read: no nan value in all null column"}, + {iceberg.NotNaN(noNanStats), false, "should skip: cannot determine without nan stats"}, + {iceberg.NotNaN(allNansV1), false, "should skip: all values are nan"}, + {iceberg.NotNaN(nanNullOnly), false, "should skip: null values are not nan"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestRequiredColumn() { + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NotNull(iceberg.Reference("required")), true, "should read: required columns are always non-null"}, + {iceberg.IsNull(iceberg.Reference("required")), false, "should skip: required columns are always non-null"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestMissingColumn() { + _, err := newStrictMetricsEvaluator(suite.schemaDataFile, iceberg.LessThan(iceberg.Reference("missing"), int32(22)), true, true) + suite.ErrorIs(err, iceberg.ErrInvalidSchema) +} + +func (suite *StrictMetricsTestSuite) TestMissingStats() { + noStatsSchema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 2, Name: "no_stats", Type: iceberg.PrimitiveTypes.Float64}) + + noStatsFile := &mockDataFile{ + path: "file_1.parquet", + format: iceberg.ParquetFile, + count: 50, + } + + ref := iceberg.Reference("no_stats") + tests := []iceberg.BooleanExpression{ + iceberg.LessThan(ref, int32(5)), + iceberg.LessThanEqual(ref, int32(30)), + iceberg.EqualTo(ref, int32(70)), + iceberg.GreaterThan(ref, int32(78)), + iceberg.GreaterThanEqual(ref, int32(90)), + iceberg.NotEqualTo(ref, int32(101)), + iceberg.IsNull(ref), + iceberg.NotNull(ref), + iceberg.IsNaN(ref), + iceberg.NotNaN(ref), + } + + for _, tt := range tests { + suite.Run(tt.String(), func() { + eval, err := newStrictMetricsEvaluator(noStatsSchema, tt, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(noStatsFile) + suite.Require().NoError(err) + suite.False(shouldRead, "should not read when stats are missing") + }) + } +} + +func (suite *StrictMetricsTestSuite) TestZeroRecordFileStats() { + zeroRecordFile := &mockDataFile{ + path: "file_1.parquet", + format: iceberg.ParquetFile, + count: 0, + } + + ref := iceberg.Reference("no_stats") + tests := []iceberg.BooleanExpression{ + iceberg.LessThan(ref, int32(5)), + iceberg.LessThanEqual(ref, int32(30)), + iceberg.EqualTo(ref, int32(70)), + iceberg.GreaterThan(ref, int32(78)), + iceberg.GreaterThanEqual(ref, int32(90)), + iceberg.NotEqualTo(ref, int32(101)), + iceberg.IsNull(ref), + iceberg.NotNull(ref), + iceberg.IsNaN(ref), + iceberg.NotNaN(ref), + } + + for _, tt := range tests { + suite.Run(tt.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt, true, false) + suite.Require().NoError(err) + shouldRead, err := eval(zeroRecordFile) + suite.Require().NoError(err) + suite.True(shouldRead, "should match datafile without records") + }) + } +} + +func (suite *StrictMetricsTestSuite) TestNot() { + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NewNot(iceberg.LessThan(iceberg.Reference("id"), IntMinValue-25)), true, "should read: not(false)"}, + {iceberg.NewNot(iceberg.GreaterThan(iceberg.Reference("id"), IntMinValue-25)), false, "should skip: not(true)"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestAnd() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NewAnd( + iceberg.GreaterThan(ref, IntMinValue-25), + iceberg.LessThanEqual(ref, IntMinValue)), false, "should skip: range may not overlap data"}, + {iceberg.NewAnd( + iceberg.LessThan(ref, IntMinValue-25), + iceberg.GreaterThanEqual(ref, IntMinValue-30)), false, "should skip: range does not overlap data"}, + {iceberg.NewAnd( + iceberg.LessThan(ref, IntMaxValue+6), + iceberg.GreaterThanEqual(ref, IntMinValue-30)), true, "should match: range includes all data"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestOr() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NewOr( + iceberg.LessThan(ref, IntMinValue-25), + iceberg.GreaterThanEqual(ref, IntMaxValue+1)), false, "should skip: no matching values"}, + {iceberg.NewOr( + iceberg.LessThan(ref, IntMinValue-25), + iceberg.GreaterThanEqual(ref, IntMaxValue-19)), false, "should skip: some values do not match"}, + {iceberg.NewOr( + iceberg.LessThan(ref, IntMinValue-25), + iceberg.GreaterThanEqual(ref, IntMinValue)), true, "should match: all values match"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntLt() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.LessThan(ref, IntMinValue), false, "should skip: always false"}, + {iceberg.LessThan(ref, IntMinValue+1), false, "should skip: 32 and greater not in range"}, + {iceberg.LessThan(ref, IntMaxValue), false, "should skip: 79 not in range"}, + {iceberg.LessThan(ref, IntMaxValue+1), true, "should read: all values in range"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntLtEq() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.LessThanEqual(ref, IntMinValue-1), false, "should skip: always false"}, + {iceberg.LessThanEqual(ref, IntMinValue), false, "should skip: 31 and greater not in range"}, + {iceberg.LessThanEqual(ref, IntMaxValue), true, "should read: all values in range"}, + {iceberg.LessThanEqual(ref, IntMaxValue+1), true, "should read: all values in range"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntGt() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.GreaterThan(ref, IntMaxValue), false, "should skip: always false"}, + {iceberg.GreaterThan(ref, IntMaxValue-1), false, "should skip: 77 and less not in range"}, + {iceberg.GreaterThan(ref, IntMinValue), false, "should skip: 30 not in range"}, + {iceberg.GreaterThan(ref, IntMinValue-1), true, "should read: all values in range"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntGtEq() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.GreaterThanEqual(ref, IntMaxValue+1), false, "should skip: no values in range"}, + {iceberg.GreaterThanEqual(ref, IntMaxValue), false, "should skip: 78 and lower are not in range"}, + {iceberg.GreaterThanEqual(ref, IntMinValue+1), false, "should skip: 30 not in range"}, + {iceberg.GreaterThanEqual(ref, IntMinValue), true, "should read: all values in range"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntEq() { + id, alwaysFive := iceberg.Reference("id"), iceberg.Reference("always_5") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.EqualTo(id, IntMinValue-25), false, "should skip: all values != 5"}, + {iceberg.EqualTo(id, IntMinValue), false, "should skip: some values != 30"}, + {iceberg.EqualTo(id, IntMinValue-4), false, "should skip: some values != 75"}, + {iceberg.EqualTo(id, IntMaxValue), false, "should skip: some values != 79"}, + {iceberg.EqualTo(id, IntMaxValue+1), false, "should skip: some values != 80"}, + {iceberg.EqualTo(alwaysFive, IntMinValue-25), true, "should read: all values == 5"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntNeq() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NotEqualTo(ref, IntMinValue-25), true, "should read: no values == 5"}, + {iceberg.NotEqualTo(ref, IntMinValue-1), true, "should read: no values == 39"}, + {iceberg.NotEqualTo(ref, IntMinValue), false, "should skip: some value may be == 30"}, + {iceberg.NotEqualTo(ref, IntMaxValue-4), false, "should skip: some value may be == 75"}, + {iceberg.NotEqualTo(ref, IntMaxValue), false, "should skip: some value may be == 79"}, + {iceberg.NotEqualTo(ref, IntMaxValue+1), true, "should read: no values == 80"}, + {iceberg.NotEqualTo(ref, IntMaxValue+6), true, "should read: no values == 85"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntNeqRewritten() { + ref := iceberg.Reference("id") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: no values == 5"}, + {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: no values == 39"}, + {iceberg.EqualTo(ref, IntMinValue), false, "should skip: some value may be == 30"}, + {iceberg.EqualTo(ref, IntMaxValue-4), false, "should skip: some value may be == 75"}, + {iceberg.EqualTo(ref, IntMaxValue), false, "should skip: some value may be == 79"}, + {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: no values == 80"}, + {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: no values == 85"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr), true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestIntNeqRewrittenCaseInsensitive() { + ref := iceberg.Reference("ID") + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: no values == 5"}, + {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: no values == 39"}, + {iceberg.EqualTo(ref, IntMinValue), false, "should skip: some value may be == 30"}, + {iceberg.EqualTo(ref, IntMaxValue-4), false, "should skip: some value may be == 75"}, + {iceberg.EqualTo(ref, IntMaxValue), false, "should skip: some value may be == 79"}, + {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: no values == 80"}, + {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: no values == 85"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr), false, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestInMetrics() { + ref := iceberg.Reference("id") + + ids := make([]int32, 400) + for i := range ids { + ids[i] = int32(i) + } + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.IsIn(ref, IntMinValue-25, IntMinValue-24), false, "should skip: all values != 5 and != 6"}, + {iceberg.IsIn(ref, IntMinValue-1, IntMinValue), false, "should skip: some values != 30 and != 31"}, + {iceberg.IsIn(ref, IntMaxValue-4, IntMaxValue-3), false, "should skip: some values != 75 and != 76"}, + {iceberg.IsIn(ref, IntMaxValue, IntMaxValue+1), false, "should skip: some values != 78 and != 79"}, + {iceberg.IsIn(ref, IntMaxValue+1, IntMaxValue+2), false, "should skip: some values != 80 and != 81"}, + {iceberg.IsIn(iceberg.Reference("always_5"), int32(5), int32(6)), true, "should read: all values == 5"}, + {iceberg.IsIn(iceberg.Reference("all_nulls"), "abc", "def"), false, "should skip: in on all nulls column"}, + {iceberg.IsIn(iceberg.Reference("some_nulls"), "abc", "def"), false, "should skip: in on some nulls column"}, + {iceberg.IsIn(iceberg.Reference("no_nulls"), "abc", "def"), false, "should skip: in on no nulls column"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + +func (suite *StrictMetricsTestSuite) TestNotInMetrics() { + ref := iceberg.Reference("id") + + tests := []struct { + expr iceberg.BooleanExpression + expected bool + msg string + }{ + {iceberg.NotIn(ref, IntMinValue-1, IntMinValue), false, "should skip: some values may be == 30"}, + {iceberg.NotIn(ref, IntMaxValue-4, IntMaxValue-3), false, "should skip: some value may be == 75 or == 76"}, + {iceberg.NotIn(ref, IntMaxValue, IntMaxValue+1), false, "should skip: some value may be == 79"}, + {iceberg.NotIn(ref, IntMaxValue+1, IntMaxValue+2), true, "should read: no values == 80 or == 81"}, + {iceberg.NotIn(iceberg.Reference("always_5"), int32(5), int32(6)), false, "should skip: all values == 5"}, + {iceberg.NotIn(iceberg.Reference("all_nulls"), "abc", "def"), true, "should read: notIn on all nulls column"}, + {iceberg.NotIn(iceberg.Reference("some_nulls"), "abc", "def"), true, "should read: notIn on some nulls column"}, + {iceberg.NotIn(iceberg.Reference("no_nulls"), "abc", "def"), false, "should read: notIn on no nulls column"}, + } + + for _, tt := range tests { + suite.Run(tt.expr.String(), func() { + eval, err := newStrictMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true) + suite.Require().NoError(err) + shouldRead, err := eval(suite.dataFiles[0]) + suite.Require().NoError(err) + suite.Equal(tt.expected, shouldRead, tt.msg) + }) + } +} + func TestEvaluators(t *testing.T) { suite.Run(t, &ProjectionTestSuite{}) suite.Run(t, &InclusiveMetricsTestSuite{}) + suite.Run(t, &StrictMetricsTestSuite{}) }