diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index d2abb7273c6cb..fd268ca1f05c8 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -55,8 +55,8 @@ func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arro } func TestConvertArrowRecordsToLokiResult(t *testing.T) { - mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String) - mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String) + mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String) + mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.Loki.String) t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) { schema := arrow.NewSchema( diff --git a/pkg/engine/executor/dataobjscan.go b/pkg/engine/executor/dataobjscan.go index 0d0f92075746b..279bc4ed21b0c 100644 --- a/pkg/engine/executor/dataobjscan.go +++ b/pkg/engine/executor/dataobjscan.go @@ -469,15 +469,15 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro // so we don't always explode out to the full set of columns. addField(arrow.Field{ Name: columnExpr.Ref.Column, - Type: arrow.BinaryTypes.String, + Type: datatype.Arrow.String, Nullable: true, - Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String), + Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String), }) addField(arrow.Field{ Name: columnExpr.Ref.Column, - Type: arrow.BinaryTypes.String, + Type: datatype.Arrow.String, Nullable: true, - Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String), + Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.Loki.String), }) case types.ColumnTypeParsed, types.ColumnTypeGenerated: @@ -500,7 +500,7 @@ func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata } } - return arrow.BinaryTypes.String, datatype.ColumnMetadata(ref.Type, datatype.String) + return datatype.Arrow.String, datatype.ColumnMetadata(ref.Type, datatype.Loki.String) } // appendToBuilder appends a the provided field from record into the given diff --git a/pkg/engine/executor/dataobjscan_predicate.go b/pkg/engine/executor/dataobjscan_predicate.go index 31ca549c254c4..851bdf62a7ab3 100644 --- a/pkg/engine/executor/dataobjscan_predicate.go +++ b/pkg/engine/executor/dataobjscan_predicate.go @@ -175,7 +175,7 @@ func (m *timestampPredicateMapper) verify(expr physical.Expression) error { if !okRHS { return fmt.Errorf("invalid RHS for comparison: expected literal timestamp, got %T", binop.Right) } - if rhsLit.ValueType() != datatype.Timestamp { + if rhsLit.ValueType() != datatype.Loki.Timestamp { return fmt.Errorf("unsupported literal type for RHS: %s, expected timestamp", rhsLit.ValueType()) } return nil @@ -231,11 +231,12 @@ func (m *timestampPredicateMapper) rebound(op types.BinaryOp, rightExpr physical return fmt.Errorf("internal error: rebound expected LiteralExpr, got %T for: %s", rightExpr, rightExpr.String()) } - if literalExpr.ValueType() != datatype.Timestamp { + if literalExpr.ValueType() != datatype.Loki.Timestamp { // Also should be caught by verify. return fmt.Errorf("internal error: unsupported literal type in rebound: %s, expected timestamp", literalExpr.ValueType()) } - val := literalExpr.Literal.(*datatype.TimestampLiteral).Value() + v := literalExpr.Literal.(datatype.TimestampLiteral).Value() + val := time.Unix(0, int64(v)).UTC() switch op { case types.BinaryOpEq: // ts == val @@ -334,10 +335,10 @@ func mapMetadataPredicate(expr physical.Expression) (logs.RowPredicate, error) { if !ok { // Should not happen return nil, fmt.Errorf("RHS of EQ metadata predicate failed to cast to LiteralExpr") } - if rightLiteral.ValueType() != datatype.String { + if rightLiteral.ValueType() != datatype.Loki.String { return nil, fmt.Errorf("unsupported RHS literal type (%v) for EQ metadata predicate, expected ValueTypeStr", rightLiteral.ValueType()) } - val := rightLiteral.Literal.(*datatype.StringLiteral).Value() + val := rightLiteral.Literal.(datatype.StringLiteral).Value() return logs.MetadataMatcherRowPredicate{ Key: leftColumn.Ref.Column, @@ -510,9 +511,9 @@ func rhsValue(e *physical.BinaryExpr) (string, error) { if !ok { // Should not happen return "", fmt.Errorf("RHS of %s message predicate failed to cast to LiteralExpr", op) } - if rightLiteral.ValueType() != datatype.String { + if rightLiteral.ValueType() != datatype.Loki.String { return "", fmt.Errorf("unsupported RHS literal type (%v) for %s message predicate, expected ValueTypeStr", rightLiteral.ValueType(), op) } - return rightLiteral.Literal.(*datatype.StringLiteral).Value(), nil + return rightLiteral.Literal.(datatype.StringLiteral).Value(), nil } diff --git a/pkg/engine/executor/dataobjscan_predicate_test.go b/pkg/engine/executor/dataobjscan_predicate_test.go index bfccb2a7a267a..7b37729eb62ac 100644 --- a/pkg/engine/executor/dataobjscan_predicate_test.go +++ b/pkg/engine/executor/dataobjscan_predicate_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) @@ -31,6 +32,10 @@ func tsColExpr() *physical.ColumnExpr { return newColumnExpr(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin) } +func ts(t time.Time) datatype.Timestamp { + return datatype.Timestamp(t.UnixNano()) +} + func TestMapTimestampPredicate(t *testing.T) { time100 := time.Unix(0, 100).UTC() time200 := time.Unix(0, 200).UTC() @@ -47,7 +52,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, want: logs.TimeRangeRowPredicate{ StartTime: time100, @@ -61,7 +66,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, want: logs.TimeRangeRowPredicate{ StartTime: time100, @@ -75,7 +80,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, want: logs.TimeRangeRowPredicate{ StartTime: time100, @@ -89,7 +94,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, want: logs.TimeRangeRowPredicate{ StartTime: testOpenStart, @@ -103,7 +108,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, want: logs.TimeRangeRowPredicate{ StartTime: testOpenStart, @@ -120,7 +125,7 @@ func TestMapTimestampPredicate(t *testing.T) { Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, errMatch: "invalid RHS for comparison: expected literal timestamp, got *physical.BinaryExpr", @@ -136,7 +141,7 @@ func TestMapTimestampPredicate(t *testing.T) { Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLte, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, }, @@ -144,15 +149,15 @@ func TestMapTimestampPredicate(t *testing.T) { }, { desc: "input is not BinaryExpr", - expr: physical.NewLiteral(time100), + expr: physical.NewLiteral(ts(time100)), errMatch: "unsupported expression type for timestamp predicate: *physical.LiteralExpr, expected *physical.BinaryExpr", }, { desc: "LHS of BinaryExpr is not ColumnExpr", expr: &physical.BinaryExpr{ - Left: physical.NewLiteral(time100), + Left: physical.NewLiteral(ts(time100)), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, errMatch: "invalid LHS for comparison: expected timestamp column, got 1970-01-01T00:00:00.0000001Z", }, @@ -161,7 +166,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: newColumnExpr("other_col", types.ColumnTypeBuiltin), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, errMatch: "invalid LHS for comparison: expected timestamp column, got builtin.other_col", }, @@ -188,7 +193,7 @@ func TestMapTimestampPredicate(t *testing.T) { expr: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpAnd, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, errMatch: "invalid left operand for AND: unsupported expression type for timestamp predicate: *physical.ColumnExpr, expected *physical.BinaryExpr", }, @@ -200,7 +205,7 @@ func TestMapTimestampPredicate(t *testing.T) { Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, errMatch: "unsupported operator for timestamp predicate: OR", @@ -211,13 +216,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, want: logs.TimeRangeRowPredicate{ @@ -233,13 +238,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLte, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, want: logs.TimeRangeRowPredicate{ @@ -255,13 +260,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, errMatch: "impossible time range: start_time (1970-01-01 00:00:00.0000001 +0000 UTC) equals end_time (1970-01-01 00:00:00.0000001 +0000 UTC) but the range is exclusive", @@ -272,13 +277,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, errMatch: "impossible time range: start_time (1970-01-01 00:00:00.0000001 +0000 UTC) equals end_time (1970-01-01 00:00:00.0000001 +0000 UTC) but the range is exclusive", @@ -289,13 +294,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLte, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, want: logs.TimeRangeRowPredicate{ @@ -311,13 +316,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, errMatch: "impossible time range: start_time (1970-01-01 00:00:00.0000002 +0000 UTC) is after end_time (1970-01-01 00:00:00.0000001 +0000 UTC)", @@ -328,13 +333,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, errMatch: "impossible time range: start_time (1970-01-01 00:00:00.0000002 +0000 UTC) is after end_time (1970-01-01 00:00:00.0000001 +0000 UTC)", @@ -346,20 +351,20 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time300), + Right: physical.NewLiteral(ts(time300)), }, }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpEq, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, want: logs.TimeRangeRowPredicate{ @@ -375,13 +380,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ // Invalid: LHS not timestamp column Left: newColumnExpr("not_ts", types.ColumnTypeBuiltin), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, errMatch: "invalid left operand for AND: invalid LHS for comparison: expected timestamp column, got builtin.not_ts", @@ -392,7 +397,7 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpGt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ // Invalid: RHS literal not timestamp @@ -409,13 +414,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, }, want: logs.TimeRangeRowPredicate{ @@ -431,13 +436,13 @@ func TestMapTimestampPredicate(t *testing.T) { Left: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time200), + Right: physical.NewLiteral(ts(time200)), }, Op: types.BinaryOpAnd, Right: &physical.BinaryExpr{ Left: tsColExpr(), Op: types.BinaryOpLt, - Right: physical.NewLiteral(time100), + Right: physical.NewLiteral(ts(time100)), }, }, want: logs.TimeRangeRowPredicate{ @@ -620,7 +625,7 @@ func TestMapMetadataPredicate(t *testing.T) { name: "error: RHS literal not string for EQ", expr: &physical.BinaryExpr{ Left: &physical.ColumnExpr{Ref: types.ColumnRef{Column: "foo", Type: types.ColumnTypeMetadata}}, - Right: physical.NewLiteral(123), // Not string + Right: physical.NewLiteral(int64(123)), // Not string Op: types.BinaryOpEq, }, expectedPred: nil, diff --git a/pkg/engine/executor/dataobjscan_test.go b/pkg/engine/executor/dataobjscan_test.go index b7ac328ae113d..fa2cabd919dec 100644 --- a/pkg/engine/executor/dataobjscan_test.go +++ b/pkg/engine/executor/dataobjscan_test.go @@ -20,8 +20,8 @@ import ( ) var ( - labelMD = datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String) - metadataMD = datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String) + labelMD = datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String) + metadataMD = datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.Loki.String) ) func Test_dataobjScan(t *testing.T) { diff --git a/pkg/engine/executor/expressions.go b/pkg/engine/executor/expressions.go index 8441063480d7c..9cb3dbc60b8b9 100644 --- a/pkg/engine/executor/expressions.go +++ b/pkg/engine/executor/expressions.go @@ -48,7 +48,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) // A non-existent column is represented as a string scalar with zero-value. // This reflects current behaviour, where a label filter `| foo=""` would match all if `foo` is not defined. return &Scalar{ - value: datatype.NewStringLiteral(""), + value: datatype.NewLiteral(""), rows: input.NumRows(), ct: types.ColumnTypeGenerated, }, nil diff --git a/pkg/engine/executor/expressions_test.go b/pkg/engine/executor/expressions_test.go index 165ce2db212bc..8163ef66105c3 100644 --- a/pkg/engine/executor/expressions_test.go +++ b/pkg/engine/executor/expressions_test.go @@ -16,10 +16,10 @@ import ( var ( fields = []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)}, - {Name: "value", Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Float)}, - {Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "timestamp", Type: datatype.Arrow.Timestamp, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Timestamp)}, + {Name: "value", Type: datatype.Arrow.Float, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Float)}, + {Name: "valid", Type: datatype.Arrow.Bool, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Bool)}, } sampledata = `Alice,1745487598764058205,0.2586284611568047,false Bob,1745487598764058305,0.7823145698741236,true @@ -37,6 +37,7 @@ func TestEvaluateLiteralExpression(t *testing.T) { for _, tt := range []struct { name string value any + want any arrowType arrow.Type }{ { @@ -66,17 +67,17 @@ func TestEvaluateLiteralExpression(t *testing.T) { }, { name: "timestamp", - value: time.Unix(3600, 0).UTC(), + value: datatype.Timestamp(3600000000), arrowType: arrow.INT64, }, { name: "duration", - value: time.Hour, + value: datatype.Duration(3600000000), arrowType: arrow.INT64, }, { name: "bytes", - value: int64(1024), + value: datatype.Bytes(1024), arrowType: arrow.INT64, }, } { @@ -92,7 +93,11 @@ func TestEvaluateLiteralExpression(t *testing.T) { for i := range n { val := colVec.Value(i) - require.Equal(t, tt.value, val) + if tt.want != nil { + require.Equal(t, tt.want, val) + } else { + require.Equal(t, tt.value, val) + } } }) } @@ -226,8 +231,8 @@ func batch(n int, now time.Time) arrow.Record { // 2. Define the schema schema := arrow.NewSchema( []arrow.Field{ - {Name: "message", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage}, - {Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: "message", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadataBuiltinMessage}, + {Name: "timestamp", Type: datatype.Arrow.Timestamp, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, }, nil, // No metadata ) @@ -236,16 +241,16 @@ func batch(n int, now time.Time) arrow.Record { logBuilder := array.NewStringBuilder(mem) defer logBuilder.Release() - tsBuilder := array.NewUint64Builder(mem) + tsBuilder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) defer tsBuilder.Release() // 4. Append data to the builders logs := make([]string, n) - ts := make([]uint64, n) + ts := make([]arrow.Timestamp, n) for i := range n { logs[i] = words[i%len(words)] - ts[i] = uint64(now.Add(time.Duration(i) * time.Second).UnixNano()) + ts[i] = arrow.Timestamp(now.Add(time.Duration(i) * time.Second).UnixNano()) } tsBuilder.AppendValues(ts, nil) diff --git a/pkg/engine/executor/filter_test.go b/pkg/engine/executor/filter_test.go index 5f64aa26993da..d0cfa9cd4b973 100644 --- a/pkg/engine/executor/filter_test.go +++ b/pkg/engine/executor/filter_test.go @@ -15,8 +15,8 @@ import ( func TestNewFilterPipeline(t *testing.T) { fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Bool)}, } t.Run("filter with true literal predicate", func(t *testing.T) { diff --git a/pkg/engine/executor/limit_test.go b/pkg/engine/executor/limit_test.go index aa3574e9d6fc9..917060e2886c5 100644 --- a/pkg/engine/executor/limit_test.go +++ b/pkg/engine/executor/limit_test.go @@ -7,6 +7,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) @@ -31,7 +32,7 @@ func TestExecuteLimit(t *testing.T) { t.Run("with valid input", func(t *testing.T) { // Create test data fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, + {Name: "name", Type: datatype.Arrow.String}, } csvData := "Alice\nBob\nCharlie" record, err := CSVToArrow(fields, csvData) @@ -69,7 +70,7 @@ func TestExecuteLimit(t *testing.T) { t.Run("with rows split across batches", func(t *testing.T) { // Create schema with a single string column fields := []arrow.Field{ - {Name: "letter", Type: arrow.BinaryTypes.String}, + {Name: "letter", Type: datatype.Arrow.String}, } // Create first batch with letters A-C @@ -97,7 +98,7 @@ func TestExecuteLimit(t *testing.T) { // First read should give us C from batch1 expectedFields := []arrow.Field{ - {Name: "letter", Type: arrow.BinaryTypes.String}, + {Name: "letter", Type: datatype.Arrow.String}, } expectedData := "C\nD\nE" @@ -115,7 +116,7 @@ func TestExecuteLimit(t *testing.T) { func TestLimitPipeline_Skip_Fetch(t *testing.T) { // Create test pipeline with known data fields := []arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int32}, + {Name: "id", Type: datatype.Arrow.Integer}, } // Create a pipeline with numbers 1-10 @@ -159,7 +160,7 @@ func TestLimitPipeline_Skip_Fetch(t *testing.T) { func TestLimitPipeline_MultipleBatches(t *testing.T) { // Create test pipeline with multiple batches fields := []arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int32}, + {Name: "id", Type: datatype.Arrow.Integer}, } // First batch: 1-5 @@ -183,7 +184,7 @@ func TestLimitPipeline_MultipleBatches(t *testing.T) { defer limit.Close() expectedFields := []arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int32}, + {Name: "id", Type: datatype.Arrow.Integer}, } expectedData := "4\n5\n6\n7\n8\n" diff --git a/pkg/engine/executor/pipeline_test.go b/pkg/engine/executor/pipeline_test.go index 4da2ed80671bb..131e4f3a77d55 100644 --- a/pkg/engine/executor/pipeline_test.go +++ b/pkg/engine/executor/pipeline_test.go @@ -9,6 +9,8 @@ import ( "github.com/apache/arrow-go/v18/arrow/csv" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" ) // CSVToArrow converts a CSV string to an Arrow record based on the provided schema. @@ -47,8 +49,8 @@ func CSVToArrowWithAllocator(allocator memory.Allocator, fields []arrow.Field, c func TestCSVPipeline(t *testing.T) { // Define test schema fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "age", Type: arrow.PrimitiveTypes.Int32}, + {Name: "name", Type: datatype.Arrow.String}, + {Name: "age", Type: datatype.Arrow.Integer}, } schema := arrow.NewSchema(fields, nil) diff --git a/pkg/engine/executor/pipeline_utils_test.go b/pkg/engine/executor/pipeline_utils_test.go index 02b8233d9f22a..b39a12e293945 100644 --- a/pkg/engine/executor/pipeline_utils_test.go +++ b/pkg/engine/executor/pipeline_utils_test.go @@ -6,6 +6,8 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" ) // AssertPipelinesEqual iterates through two pipelines and ensures they contain @@ -132,8 +134,8 @@ func AssertPipelinesEqual(t testing.TB, left, right Pipeline) { func TestAssertPipelinesEqual(t *testing.T) { // Define test schema fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "age", Type: arrow.PrimitiveTypes.Int32}, + {Name: "name", Type: datatype.Arrow.String}, + {Name: "age", Type: datatype.Arrow.Integer}, } // Create test data diff --git a/pkg/engine/executor/project_test.go b/pkg/engine/executor/project_test.go index c3e48454ecb02..47ac5dbf51284 100644 --- a/pkg/engine/executor/project_test.go +++ b/pkg/engine/executor/project_test.go @@ -13,9 +13,9 @@ import ( func TestNewProjectPipeline(t *testing.T) { fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, - {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "age", Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Integer)}, + {Name: "city", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, } t.Run("project single column", func(t *testing.T) { @@ -42,7 +42,7 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "Alice\nBob\nCharlie" expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -81,8 +81,8 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "Alice,New York\nBob,Boston\nCharlie,Seattle" expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "city", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -124,9 +124,9 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "New York,30,Alice\nBoston,25,Bob\nSeattle,35,Charlie" expectedFields := []arrow.Field{ - {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "city", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "age", Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Integer)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -170,8 +170,8 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output also split across multiple records expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, - {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, + {Name: "name", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.String)}, + {Name: "age", Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Integer)}, } expected := ` diff --git a/pkg/engine/executor/range_aggregation.go b/pkg/engine/executor/range_aggregation.go index 878abde0ec0c9..8ab5a1c836f1c 100644 --- a/pkg/engine/executor/range_aggregation.go +++ b/pkg/engine/executor/range_aggregation.go @@ -125,7 +125,7 @@ func (r *RangeAggregationPipeline) read() (arrow.Record, error) { return nil, err } - if vec.Type() != datatype.String { + if vec.Type() != datatype.Loki.String { return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type()) } @@ -163,15 +163,15 @@ func (r *RangeAggregationPipeline) read() (arrow.Record, error) { fields = append(fields, arrow.Field{ Name: types.ColumnNameBuiltinTimestamp, - Type: arrow.FixedWidthTypes.Timestamp_ns, + Type: datatype.Arrow.Timestamp, Nullable: false, Metadata: datatype.ColumnMetadataBuiltinTimestamp, }, arrow.Field{ Name: types.ColumnNameGeneratedValue, - Type: arrow.PrimitiveTypes.Int64, + Type: datatype.Arrow.Integer, Nullable: false, - Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer), // needs a new ColumnType, ColumnTypeComputed or Generated? + Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer), }, ) @@ -183,9 +183,9 @@ func (r *RangeAggregationPipeline) read() (arrow.Record, error) { fields = append(fields, arrow.Field{ Name: columnExpr.Ref.Column, - Type: arrow.BinaryTypes.String, + Type: datatype.Arrow.String, Nullable: true, - Metadata: datatype.ColumnMetadata(columnExpr.Ref.Type, datatype.String), + Metadata: datatype.ColumnMetadata(columnExpr.Ref.Type, datatype.Loki.String), }) } diff --git a/pkg/engine/executor/range_aggregation_test.go b/pkg/engine/executor/range_aggregation_test.go index 0ef487378b6d8..d5fe60e3fdf4c 100644 --- a/pkg/engine/executor/range_aggregation_test.go +++ b/pkg/engine/executor/range_aggregation_test.go @@ -20,10 +20,10 @@ const arrowTimestampFormat = "2006-01-02T15:04:05.000000000Z" func TestRangeAggregationPipeline(t *testing.T) { // input schema with timestamp, partition-by columns and non-partition columns fields := []arrow.Field{ - {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, - {Name: "env", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)}, - {Name: "service", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)}, - {Name: "severity", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)}, // extra column not included in partition_by + {Name: types.ColumnNameBuiltinTimestamp, Type: datatype.Arrow.Timestamp, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: "env", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String)}, + {Name: "service", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String)}, + {Name: "severity", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.Loki.String)}, // extra column not included in partition_by } // test data for first input diff --git a/pkg/engine/executor/util_test.go b/pkg/engine/executor/util_test.go index 98ecf3795638b..4220905133a07 100644 --- a/pkg/engine/executor/util_test.go +++ b/pkg/engine/executor/util_test.go @@ -16,7 +16,7 @@ import ( var ( incrementingIntPipeline = newRecordGenerator( arrow.NewSchema([]arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, + {Name: "id", Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Integer)}, }, nil), func(offset, sz int64, schema *arrow.Schema) arrow.Record { @@ -52,8 +52,8 @@ const ( func timestampPipeline(start time.Time, order time.Duration) *recordGenerator { return newRecordGenerator( arrow.NewSchema([]arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, - {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)}, + {Name: "id", Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Integer)}, + {Name: "timestamp", Type: datatype.Arrow.Timestamp, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Loki.Timestamp)}, }, nil), func(offset, sz int64, schema *arrow.Schema) arrow.Record { diff --git a/pkg/engine/executor/vector_aggregate.go b/pkg/engine/executor/vector_aggregate.go index c676fce1d3601..3ce1c15973382 100644 --- a/pkg/engine/executor/vector_aggregate.go +++ b/pkg/engine/executor/vector_aggregate.go @@ -123,7 +123,7 @@ func (v *VectorAggregationPipeline) read() (arrow.Record, error) { return nil, err } - if vec.Type() != datatype.String { + if vec.Type() != datatype.Loki.String { return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.Type()) } @@ -237,15 +237,15 @@ func (a *vectorAggregator) buildRecord() (arrow.Record, error) { fields = append(fields, arrow.Field{ Name: types.ColumnNameBuiltinTimestamp, - Type: arrow.FixedWidthTypes.Timestamp_ns, + Type: datatype.Arrow.Timestamp, Nullable: false, Metadata: datatype.ColumnMetadataBuiltinTimestamp, }, arrow.Field{ Name: types.ColumnNameGeneratedValue, - Type: arrow.PrimitiveTypes.Int64, + Type: datatype.Arrow.Integer, Nullable: false, - Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer), + Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer), }, ) @@ -257,9 +257,9 @@ func (a *vectorAggregator) buildRecord() (arrow.Record, error) { fields = append(fields, arrow.Field{ Name: colExpr.Ref.Column, - Type: arrow.BinaryTypes.String, + Type: datatype.Arrow.String, Nullable: true, - Metadata: datatype.ColumnMetadata(colExpr.Ref.Type, datatype.String), + Metadata: datatype.ColumnMetadata(colExpr.Ref.Type, datatype.Loki.String), }) } diff --git a/pkg/engine/executor/vector_aggregate_test.go b/pkg/engine/executor/vector_aggregate_test.go index 64cc292c2e20c..7475c1b2d307f 100644 --- a/pkg/engine/executor/vector_aggregate_test.go +++ b/pkg/engine/executor/vector_aggregate_test.go @@ -18,10 +18,10 @@ import ( func TestVectorAggregationPipeline(t *testing.T) { // input schema with timestamp, value and group by columns fields := []arrow.Field{ - {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, - {Name: types.ColumnNameGeneratedValue, Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Integer)}, - {Name: "env", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)}, - {Name: "service", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)}, + {Name: types.ColumnNameBuiltinTimestamp, Type: datatype.Arrow.Timestamp, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: types.ColumnNameGeneratedValue, Type: datatype.Arrow.Integer, Metadata: datatype.ColumnMetadata(types.ColumnTypeGenerated, datatype.Loki.Integer)}, + {Name: "env", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String)}, + {Name: "service", Type: datatype.Arrow.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.Loki.String)}, } now := time.Now().UTC() diff --git a/pkg/engine/internal/datatype/arrow.go b/pkg/engine/internal/datatype/arrow.go index dd561745fa0c7..52bac06f63cbc 100644 --- a/pkg/engine/internal/datatype/arrow.go +++ b/pkg/engine/internal/datatype/arrow.go @@ -3,7 +3,7 @@ package datatype import "github.com/apache/arrow-go/v18/arrow" var ( - LokiType = struct { + Loki = struct { Null DataType Bool DataType String DataType @@ -13,17 +13,17 @@ var ( Duration DataType Bytes DataType }{ - Null: Null, - Bool: Bool, - String: String, - Integer: Integer, - Float: Float, - Timestamp: Timestamp, - Duration: Duration, - Bytes: Bytes, + Null: tNull{}, + Bool: tBool{}, + String: tString{}, + Integer: tInteger{}, + Float: tFloat{}, + Timestamp: tTimestamp{}, + Duration: tDuration{}, + Bytes: tBytes{}, } - ArrowType = struct { + Arrow = struct { Null arrow.DataType Bool arrow.DataType String arrow.DataType @@ -38,30 +38,8 @@ var ( String: arrow.BinaryTypes.String, Integer: arrow.PrimitiveTypes.Int64, Float: arrow.PrimitiveTypes.Float64, - Timestamp: arrow.PrimitiveTypes.Int64, + Timestamp: arrow.FixedWidthTypes.Timestamp_ns, Duration: arrow.PrimitiveTypes.Int64, Bytes: arrow.PrimitiveTypes.Int64, } - - ToArrow = map[DataType]arrow.DataType{ - Null: ArrowType.Null, - Bool: ArrowType.Bool, - String: ArrowType.String, - Integer: ArrowType.Integer, - Float: ArrowType.Float, - Timestamp: ArrowType.Timestamp, - Duration: ArrowType.Duration, - Bytes: ArrowType.Bytes, - } - - ToLoki = map[arrow.DataType]DataType{ - ArrowType.Null: Null, - ArrowType.Bool: Bool, - ArrowType.String: String, - ArrowType.Integer: Integer, - ArrowType.Float: Float, - ArrowType.Timestamp: Timestamp, - ArrowType.Duration: Duration, - ArrowType.Bytes: Bytes, - } ) diff --git a/pkg/engine/internal/datatype/literal.go b/pkg/engine/internal/datatype/literal.go index 6dc3b668b626a..c0edb01804b35 100644 --- a/pkg/engine/internal/datatype/literal.go +++ b/pkg/engine/internal/datatype/literal.go @@ -4,189 +4,177 @@ import ( "fmt" "strconv" "time" + + "github.com/dustin/go-humanize" ) type NullLiteral struct { } // String implements Literal. -func (n *NullLiteral) String() string { +func (n NullLiteral) String() string { return "null" } // Type implements Literal. -func (n *NullLiteral) Type() DataType { - return Null +func (n NullLiteral) Type() DataType { + return Loki.Null } // Any implements Literal. -func (n *NullLiteral) Any() any { +func (n NullLiteral) Any() any { return nil } -func (n *NullLiteral) Value() any { +func (n NullLiteral) Value() any { return nil } -type BoolLiteral struct { - v bool -} +type BoolLiteral bool // String implements Literal. -func (b *BoolLiteral) String() string { - return strconv.FormatBool(b.v) +func (b BoolLiteral) String() string { + return strconv.FormatBool(bool(b)) } // Type implements Literal. -func (b *BoolLiteral) Type() DataType { - return Bool +func (b BoolLiteral) Type() DataType { + return Loki.Bool } // Any implements Literal. -func (b *BoolLiteral) Any() any { - return b.v +func (b BoolLiteral) Any() any { + return b.Value() } -func (b *BoolLiteral) Value() bool { - return b.v +func (b BoolLiteral) Value() bool { + return bool(b) } -type StringLiteral struct { - v string -} +type StringLiteral string // String implements Literal. -func (s *StringLiteral) String() string { - return fmt.Sprintf(`"%s"`, s.v) +func (s StringLiteral) String() string { + return fmt.Sprintf(`"%v"`, string(s)) } // Type implements Literal. -func (s *StringLiteral) Type() DataType { - return String +func (s StringLiteral) Type() DataType { + return Loki.String } // Any implements Literal. -func (s *StringLiteral) Any() any { - return s.v +func (s StringLiteral) Any() any { + return s.Value() } -func (s *StringLiteral) Value() string { - return s.v +func (s StringLiteral) Value() string { + return string(s) } -type IntegerLiteral struct { - v int64 -} +type IntegerLiteral int64 // String implements Literal. -func (i *IntegerLiteral) String() string { - return strconv.FormatInt(i.v, 10) +func (i IntegerLiteral) String() string { + return strconv.FormatInt(int64(i), 10) } // Type implements Literal. -func (i *IntegerLiteral) Type() DataType { - return Integer +func (i IntegerLiteral) Type() DataType { + return Loki.Integer } // Any implements Literal. -func (i *IntegerLiteral) Any() any { - return i.v +func (i IntegerLiteral) Any() any { + return i.Value() } -func (i *IntegerLiteral) Value() int64 { - return i.v +func (i IntegerLiteral) Value() int64 { + return int64(i) } -type FloatLiteral struct { - v float64 -} +type FloatLiteral float64 // String implements Literal. -func (f *FloatLiteral) String() string { - return strconv.FormatFloat(f.v, 'f', -1, 64) +func (f FloatLiteral) String() string { + return strconv.FormatFloat(float64(f), 'f', -1, 64) } // Type implements Literal. -func (f *FloatLiteral) Type() DataType { - return Float +func (f FloatLiteral) Type() DataType { + return Loki.Float } // Any implements Literal. -func (f *FloatLiteral) Any() any { - return f.v +func (f FloatLiteral) Any() any { + return f.Value() } -func (f *FloatLiteral) Value() float64 { - return f.v +func (f FloatLiteral) Value() float64 { + return float64(f) } -type TimestampLiteral struct { - v int64 // unixnano, UTC -} +type TimestampLiteral Timestamp // String implements Literal. -func (t *TimestampLiteral) String() string { - return time.Unix(0, t.v).UTC().Format(time.RFC3339Nano) +func (t TimestampLiteral) String() string { + return time.Unix(0, int64(t.Value())).UTC().Format(time.RFC3339Nano) } // Type implements Literal. -func (t *TimestampLiteral) Type() DataType { - return Timestamp +func (t TimestampLiteral) Type() DataType { + return Loki.Timestamp } // Any implements Literal. -func (t *TimestampLiteral) Any() any { +func (t TimestampLiteral) Any() any { return t.Value() } -func (t *TimestampLiteral) Value() time.Time { - return time.Unix(0, t.v).UTC() +func (t TimestampLiteral) Value() Timestamp { + return Timestamp(t) } -type DurationLiteral struct { - v time.Duration -} +type DurationLiteral int64 // String implements Literal. -func (d *DurationLiteral) String() string { - return d.v.String() +func (d DurationLiteral) String() string { + return time.Duration(d).String() } // Type implements Literal. -func (d *DurationLiteral) Type() DataType { - return Duration +func (d DurationLiteral) Type() DataType { + return Loki.Duration } // Any implements Literal. -func (d *DurationLiteral) Any() any { - return d.v +func (d DurationLiteral) Any() any { + return d.Value() } -func (d *DurationLiteral) Value() time.Duration { - return d.v +func (d DurationLiteral) Value() Duration { + return Duration(d) } -type BytesLiteral struct { - v int64 -} +type BytesLiteral Bytes // String implements Literal. -func (b *BytesLiteral) String() string { - return fmt.Sprintf("%dB", b.v) +func (b BytesLiteral) String() string { + return humanize.IBytes(uint64(b)) } // Type implements Literal. -func (b *BytesLiteral) Type() DataType { - return Bytes +func (b BytesLiteral) Type() DataType { + return Loki.Bytes } // Any implements Literal. -func (b *BytesLiteral) Any() any { - return b.v +func (b BytesLiteral) Any() any { + return b.Value() } -func (b *BytesLiteral) Value() int64 { - return b.v +func (b BytesLiteral) Value() Bytes { + return Bytes(b) } // Literal is holds a value of [any] typed as [DataType]. @@ -196,45 +184,54 @@ type Literal interface { Type() DataType } -var ( - _ Literal = (*NullLiteral)(nil) - _ Literal = (*BoolLiteral)(nil) - _ Literal = (*StringLiteral)(nil) - _ Literal = (*IntegerLiteral)(nil) - _ Literal = (*FloatLiteral)(nil) - _ Literal = (*TimestampLiteral)(nil) - _ Literal = (*DurationLiteral)(nil) - _ Literal = (*BytesLiteral)(nil) -) - -func NewNullLiteral() *NullLiteral { - return &NullLiteral{} -} - -func NewBoolLiteral(v bool) *BoolLiteral { - return &BoolLiteral{v: v} -} - -func NewStringLiteral(v string) *StringLiteral { - return &StringLiteral{v: v} -} - -func NewIntegerLiteral(v int64) *IntegerLiteral { - return &IntegerLiteral{v: v} +type LiteralType interface { + any | bool | string | int64 | float64 | Timestamp | Duration | Bytes } -func NewFloatLiteral(v float64) *FloatLiteral { - return &FloatLiteral{v: v} +type TypedLiteral[T LiteralType] interface { + Literal + Value() T } -func NewTimestampLiteral(v time.Time) *TimestampLiteral { - return &TimestampLiteral{v: v.UTC().UnixNano()} -} - -func NewDurationLiteral(v time.Duration) *DurationLiteral { - return &DurationLiteral{v: v} -} +var ( + _ Literal = (*NullLiteral)(nil) + _ TypedLiteral[any] = (*NullLiteral)(nil) + _ Literal = (*BoolLiteral)(nil) + _ TypedLiteral[bool] = (*BoolLiteral)(nil) + _ Literal = (*StringLiteral)(nil) + _ TypedLiteral[string] = (*StringLiteral)(nil) + _ Literal = (*IntegerLiteral)(nil) + _ TypedLiteral[int64] = (*IntegerLiteral)(nil) + _ Literal = (*FloatLiteral)(nil) + _ TypedLiteral[float64] = (*FloatLiteral)(nil) + _ Literal = (*TimestampLiteral)(nil) + _ TypedLiteral[Timestamp] = (*TimestampLiteral)(nil) + _ Literal = (*DurationLiteral)(nil) + _ TypedLiteral[Duration] = (*DurationLiteral)(nil) + _ Literal = (*BytesLiteral)(nil) + _ TypedLiteral[Bytes] = (*BytesLiteral)(nil) +) -func NewBytesLiteral(v int64) *BytesLiteral { - return &BytesLiteral{v: v} +func NewLiteral[T LiteralType](value T) Literal { + switch val := any(value).(type) { + case bool: + return BoolLiteral(val) + case string: + return StringLiteral(val) + case int64: + return IntegerLiteral(val) + case float64: + return FloatLiteral(val) + case Timestamp: + return TimestampLiteral(val) + case Duration: + return DurationLiteral(val) + case Bytes: + return BytesLiteral(val) + } + panic(fmt.Sprintf("invalid literal value type %T", value)) +} + +func NewNullLiteral() NullLiteral { + return NullLiteral{} } diff --git a/pkg/engine/internal/datatype/types.go b/pkg/engine/internal/datatype/types.go index c9c6170dd4367..18bb54db9b940 100644 --- a/pkg/engine/internal/datatype/types.go +++ b/pkg/engine/internal/datatype/types.go @@ -6,6 +6,10 @@ import ( "github.com/apache/arrow-go/v18/arrow" ) +type Timestamp int64 +type Duration int64 +type Bytes int64 + type Type uint8 const ( @@ -39,75 +43,64 @@ type DataType interface { ArrowType() arrow.DataType } -var ( - Null DataType = tNull{} - Bool DataType = tBool{} - String DataType = tString{} - Integer DataType = tInteger{} - Float DataType = tFloat{} - Timestamp DataType = tTimestamp{} - Duration DataType = tDuration{} - Bytes DataType = tBytes{} -) - type tNull struct{} func (tNull) ID() Type { return NULL } func (tNull) String() string { return "null" } -func (tNull) ArrowType() arrow.DataType { return ArrowType.Null } +func (tNull) ArrowType() arrow.DataType { return Arrow.Null } type tBool struct{} func (tBool) ID() Type { return BOOL } func (tBool) String() string { return "bool" } -func (tBool) ArrowType() arrow.DataType { return ArrowType.Bool } +func (tBool) ArrowType() arrow.DataType { return Arrow.Bool } type tString struct{} func (tString) ID() Type { return STRING } func (tString) String() string { return "string" } -func (tString) ArrowType() arrow.DataType { return ArrowType.String } +func (tString) ArrowType() arrow.DataType { return Arrow.String } type tInteger struct{} func (tInteger) ID() Type { return INT64 } func (tInteger) String() string { return "integer" } -func (tInteger) ArrowType() arrow.DataType { return ArrowType.Integer } +func (tInteger) ArrowType() arrow.DataType { return Arrow.Integer } type tFloat struct{} func (tFloat) ID() Type { return FLOAT64 } func (tFloat) String() string { return "float" } -func (tFloat) ArrowType() arrow.DataType { return ArrowType.Float } +func (tFloat) ArrowType() arrow.DataType { return Arrow.Float } type tTimestamp struct{} func (tTimestamp) ID() Type { return INT64 } func (tTimestamp) String() string { return "timestamp" } -func (tTimestamp) ArrowType() arrow.DataType { return ArrowType.Integer } +func (tTimestamp) ArrowType() arrow.DataType { return Arrow.Integer } type tDuration struct{} func (tDuration) ID() Type { return INT64 } func (tDuration) String() string { return "duration" } -func (tDuration) ArrowType() arrow.DataType { return ArrowType.Integer } +func (tDuration) ArrowType() arrow.DataType { return Arrow.Integer } type tBytes struct{} func (tBytes) ID() Type { return INT64 } func (tBytes) String() string { return "bytes" } -func (tBytes) ArrowType() arrow.DataType { return ArrowType.Integer } +func (tBytes) ArrowType() arrow.DataType { return Arrow.Integer } var ( names = map[string]DataType{ - Null.String(): Null, - Bool.String(): Bool, - String.String(): String, - Integer.String(): Integer, - Float.String(): Float, - Timestamp.String(): Timestamp, - Duration.String(): Duration, - Bytes.String(): Bytes, + Loki.Null.String(): Loki.Null, + Loki.Bool.String(): Loki.Bool, + Loki.String.String(): Loki.String, + Loki.Integer.String(): Loki.Integer, + Loki.Float.String(): Loki.Float, + Loki.Timestamp.String(): Loki.Timestamp, + Loki.Duration.String(): Loki.Duration, + Loki.Bytes.String(): Loki.Bytes, } ) diff --git a/pkg/engine/internal/datatype/util.go b/pkg/engine/internal/datatype/util.go index 7366e10b1ed83..7698c2351d3a7 100644 --- a/pkg/engine/internal/datatype/util.go +++ b/pkg/engine/internal/datatype/util.go @@ -7,8 +7,8 @@ import ( ) var ( - ColumnMetadataBuiltinMessage = ColumnMetadata(types.ColumnTypeBuiltin, String) - ColumnMetadataBuiltinTimestamp = ColumnMetadata(types.ColumnTypeBuiltin, Timestamp) + ColumnMetadataBuiltinMessage = ColumnMetadata(types.ColumnTypeBuiltin, Loki.String) + ColumnMetadataBuiltinTimestamp = ColumnMetadata(types.ColumnTypeBuiltin, Loki.Timestamp) ) func ColumnMetadata(ct types.ColumnType, dt DataType) arrow.Metadata { diff --git a/pkg/engine/planner/logical/format_tree_test.go b/pkg/engine/planner/logical/format_tree_test.go index c8ce60bf50773..a7ae31de4d7c1 100644 --- a/pkg/engine/planner/logical/format_tree_test.go +++ b/pkg/engine/planner/logical/format_tree_test.go @@ -33,7 +33,7 @@ func TestFormatSimpleQuery(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral(21), + Right: NewLiteral(int64(21)), Op: types.BinaryOpGt, }, ) @@ -76,7 +76,7 @@ func TestFormatSortQuery(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral(21), + Right: NewLiteral(int64(21)), Op: types.BinaryOpGt, }, ).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false) @@ -120,7 +120,7 @@ func TestFormatRangeAggregationQuery(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral(21), + Right: NewLiteral(int64(21)), Op: types.BinaryOpGt, }, ).RangeAggregation( diff --git a/pkg/engine/planner/logical/logical_test.go b/pkg/engine/planner/logical/logical_test.go index 71a486568c9bc..6659430256cfa 100644 --- a/pkg/engine/planner/logical/logical_test.go +++ b/pkg/engine/planner/logical/logical_test.go @@ -26,7 +26,7 @@ func TestPlan_String(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral(21), + Right: NewLiteral(int64(21)), Op: types.BinaryOpGt, }, ).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false) diff --git a/pkg/engine/planner/logical/node_literal.go b/pkg/engine/planner/logical/node_literal.go index cadb283f86405..ef1483c82c47d 100644 --- a/pkg/engine/planner/logical/node_literal.go +++ b/pkg/engine/planner/logical/node_literal.go @@ -1,8 +1,6 @@ package logical import ( - "time" - "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/planner/schema" ) @@ -17,30 +15,11 @@ type Literal struct { var _ Value = (*Literal)(nil) -func NewLiteral(v any) *Literal { - if v == nil { - return &Literal{Literal: datatype.NewNullLiteral()} - } - - switch casted := v.(type) { - case bool: - return &Literal{Literal: datatype.NewBoolLiteral(casted)} - case string: - // TODO(chaudum): Try parsing bytes/timestamp/duration - return &Literal{Literal: datatype.NewStringLiteral(casted)} - case int: - return &Literal{Literal: datatype.NewIntegerLiteral(int64(casted))} - case int64: - return &Literal{Literal: datatype.NewIntegerLiteral(casted)} - case float64: - return &Literal{Literal: datatype.NewFloatLiteral(casted)} - case time.Time: - return &Literal{Literal: datatype.NewTimestampLiteral(casted)} - case time.Duration: - return &Literal{Literal: datatype.NewDurationLiteral(casted)} - default: +func NewLiteral(value datatype.LiteralType) *Literal { + if value == nil { return &Literal{Literal: datatype.NewNullLiteral()} } + return &Literal{Literal: datatype.NewLiteral(value)} } // Kind returns the kind of value represented by the literal. diff --git a/pkg/engine/planner/logical/planner.go b/pkg/engine/planner/logical/planner.go index 4c1fd2a09b502..4a6ac3c217d46 100644 --- a/pkg/engine/planner/logical/planner.go +++ b/pkg/engine/planner/logical/planner.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" @@ -246,12 +247,12 @@ func convertQueryRangeToPredicates(start, end time.Time) []*BinOp { return []*BinOp{ { Left: timestampColumnRef(), - Right: NewLiteral(start), + Right: NewLiteral(datatype.Timestamp(start.UTC().UnixNano())), Op: types.BinaryOpGte, }, { Left: timestampColumnRef(), - Right: NewLiteral(end), + Right: NewLiteral(datatype.Timestamp(end.UTC().UnixNano())), Op: types.BinaryOpLt, }, } diff --git a/pkg/engine/planner/physical/context.go b/pkg/engine/planner/physical/context.go index dcfbf0d90fe42..45afeb092a5db 100644 --- a/pkg/engine/planner/physical/context.go +++ b/pkg/engine/planner/physical/context.go @@ -162,7 +162,7 @@ func convertLiteralToString(expr Expression) (string, error) { if !ok { return "", fmt.Errorf("expected literal expression, got %T", expr) } - if l.ValueType() != datatype.String { + if l.ValueType() != datatype.Loki.String { return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) } return l.Any().(string), nil diff --git a/pkg/engine/planner/physical/context_test.go b/pkg/engine/planner/physical/context_test.go index 3f929f029981f..2598e6d851e5a 100644 --- a/pkg/engine/planner/physical/context_test.go +++ b/pkg/engine/planner/physical/context_test.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -25,15 +26,15 @@ func TestContext_ConvertLiteral(t *testing.T) { wantErr: true, }, { - expr: NewLiteral(123), + expr: NewLiteral(int64(123)), wantErr: true, }, { - expr: NewLiteral(time.Now()), + expr: NewLiteral(datatype.Timestamp(time.Now().UnixNano())), wantErr: true, }, { - expr: NewLiteral(time.Hour), + expr: NewLiteral(datatype.Duration(time.Hour.Nanoseconds())), wantErr: true, }, { diff --git a/pkg/engine/planner/physical/expressions.go b/pkg/engine/planner/physical/expressions.go index eac7126b2a546..6ea9ea002df0d 100644 --- a/pkg/engine/planner/physical/expressions.go +++ b/pkg/engine/planner/physical/expressions.go @@ -2,7 +2,6 @@ package physical import ( "fmt" - "time" "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" @@ -133,30 +132,11 @@ func (e *LiteralExpr) ValueType() datatype.DataType { return e.Literal.Type() } -func NewLiteral(value any) *LiteralExpr { +func NewLiteral(value datatype.LiteralType) *LiteralExpr { if value == nil { return &LiteralExpr{Literal: datatype.NewNullLiteral()} } - - switch casted := value.(type) { - case bool: - return &LiteralExpr{Literal: datatype.NewBoolLiteral(casted)} - case string: - // TODO(chaudum): Try parsing bytes/timestamp/duration - return &LiteralExpr{Literal: datatype.NewStringLiteral(casted)} - case int: - return &LiteralExpr{Literal: datatype.NewIntegerLiteral(int64(casted))} - case int64: - return &LiteralExpr{Literal: datatype.NewIntegerLiteral(casted)} - case float64: - return &LiteralExpr{Literal: datatype.NewFloatLiteral(casted)} - case time.Time: - return &LiteralExpr{Literal: datatype.NewTimestampLiteral(casted)} - case time.Duration: - return &LiteralExpr{Literal: datatype.NewDurationLiteral(casted)} - default: - panic(fmt.Sprintf("invalid literal value type %T", value)) - } + return &LiteralExpr{Literal: datatype.NewLiteral(value)} } // ColumnExpr is an expression that implements the [ColumnExpr] interface. diff --git a/pkg/engine/planner/physical/expressions_test.go b/pkg/engine/planner/physical/expressions_test.go index 34d6555773f34..2a1e1d527144a 100644 --- a/pkg/engine/planner/physical/expressions_test.go +++ b/pkg/engine/planner/physical/expressions_test.go @@ -2,7 +2,6 @@ package physical import ( "testing" - "time" "github.com/stretchr/testify/require" @@ -60,7 +59,7 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.Bool, literal.ValueType()) + require.Equal(t, datatype.Loki.Bool, literal.ValueType()) }) t.Run("float", func(t *testing.T) { @@ -68,7 +67,7 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.Float, literal.ValueType()) + require.Equal(t, datatype.Loki.Float, literal.ValueType()) }) t.Run("integer", func(t *testing.T) { @@ -76,23 +75,31 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.Integer, literal.ValueType()) + require.Equal(t, datatype.Loki.Integer, literal.ValueType()) }) t.Run("timestamp", func(t *testing.T) { - var expr Expression = NewLiteral(time.Unix(0, 1741882435000000000)) + var expr Expression = NewLiteral(datatype.Timestamp(1741882435000000000)) require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.Timestamp, literal.ValueType()) + require.Equal(t, datatype.Loki.Timestamp, literal.ValueType()) }) t.Run("duration", func(t *testing.T) { - var expr Expression = NewLiteral(time.Hour) + var expr Expression = NewLiteral(datatype.Duration(3600)) require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.Duration, literal.ValueType()) + require.Equal(t, datatype.Loki.Duration, literal.ValueType()) + }) + + t.Run("bytes", func(t *testing.T) { + var expr Expression = NewLiteral(datatype.Bytes(1024)) + require.Equal(t, ExprTypeLiteral, expr.Type()) + literal, ok := expr.(LiteralExpression) + require.True(t, ok) + require.Equal(t, datatype.Loki.Bytes, literal.ValueType()) }) t.Run("string", func(t *testing.T) { @@ -100,6 +107,6 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, datatype.String, literal.ValueType()) + require.Equal(t, datatype.Loki.String, literal.ValueType()) }) } diff --git a/pkg/engine/planner/physical/optimizer_test.go b/pkg/engine/planner/physical/optimizer_test.go index 147a7b2e92ab9..1aef54fe184bb 100644 --- a/pkg/engine/planner/physical/optimizer_test.go +++ b/pkg/engine/planner/physical/optimizer_test.go @@ -2,10 +2,10 @@ package physical import ( "testing" - "time" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -15,7 +15,7 @@ func TestCanApplyPredicate(t *testing.T) { want bool }{ { - predicate: NewLiteral(123), + predicate: NewLiteral(int64(123)), want: true, }, { @@ -29,7 +29,7 @@ func TestCanApplyPredicate(t *testing.T) { { predicate: &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Now()), + Right: NewLiteral(datatype.Timestamp(3600000)), Op: types.BinaryOpGt, }, want: true, @@ -51,6 +51,11 @@ func TestCanApplyPredicate(t *testing.T) { } } +var ( + time1000 = datatype.Timestamp(1000000000) + time2000 = datatype.Timestamp(2000000000) +) + func dummyPlan() *Plan { plan := &Plan{} scan1 := plan.addNode(&DataObjScan{id: "scan1"}) @@ -59,14 +64,14 @@ func dummyPlan() *Plan { filter1 := plan.addNode(&Filter{id: "filter1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 1000000000)), + Right: NewLiteral(time1000), Op: types.BinaryOpGt, }, }}) filter2 := plan.addNode(&Filter{id: "filter2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 2000000000)), + Right: NewLiteral(time2000), Op: types.BinaryOpLte, }, }}) @@ -112,24 +117,24 @@ func TestOptimizer(t *testing.T) { scan1 := optimized.addNode(&DataObjScan{id: "scan1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 1000000000)), + Right: NewLiteral(time1000), Op: types.BinaryOpGt, }, &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 2000000000)), + Right: NewLiteral(time2000), Op: types.BinaryOpLte, }, }}) scan2 := optimized.addNode(&DataObjScan{id: "scan2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 1000000000)), + Right: NewLiteral(time1000), Op: types.BinaryOpGt, }, &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 2000000000)), + Right: NewLiteral(time2000), Op: types.BinaryOpLte, }, }}) @@ -167,14 +172,14 @@ func TestOptimizer(t *testing.T) { filter1 := optimized.addNode(&Filter{id: "filter1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 1000000000)), + Right: NewLiteral(time1000), Op: types.BinaryOpGt, }, }}) filter2 := optimized.addNode(&Filter{id: "filter2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(time.Unix(0, 2000000000)), + Right: NewLiteral(time2000), Op: types.BinaryOpLte, }, }}) diff --git a/pkg/engine/planner/physical/planner_test.go b/pkg/engine/planner/physical/planner_test.go index f510c99084d4b..6893c80f6aa39 100644 --- a/pkg/engine/planner/physical/planner_test.go +++ b/pkg/engine/planner/physical/planner_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/logical" ) @@ -231,7 +232,7 @@ func TestPlanner_Convert(t *testing.T) { ).Select( &logical.BinOp{ Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), - Right: logical.NewLiteral(time.Unix(0, 1742826126000000000)), + Right: logical.NewLiteral(datatype.Timestamp(1742826126000000000)), Op: types.BinaryOpLt, }, ).Limit(0, 1000) @@ -276,7 +277,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { ).Select( &logical.BinOp{ Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), - Right: logical.NewLiteral(time.Unix(0, 1742826126000000000)), + Right: logical.NewLiteral(datatype.Timestamp(1742826126000000000)), Op: types.BinaryOpLt, }, ).RangeAggregation( diff --git a/pkg/engine/planner/physical/printer.go b/pkg/engine/planner/physical/printer.go index e51e5594fed22..04e8c480fa277 100644 --- a/pkg/engine/planner/physical/printer.go +++ b/pkg/engine/planner/physical/printer.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "time" "github.com/grafana/loki/v3/pkg/engine/planner/internal/tree" ) @@ -60,8 +61,8 @@ func toTreeNode(n Node) *tree.Node { case *RangeAggregation: properties := []tree.Property{ tree.NewProperty("operation", false, node.Operation), - tree.NewProperty("start", false, node.Start), - tree.NewProperty("end", false, node.End), + tree.NewProperty("start", false, node.Start.Format(time.RFC3339Nano)), + tree.NewProperty("end", false, node.End.Format(time.RFC3339Nano)), tree.NewProperty("range", false, node.Range), }