Skip to content

Commit 3c31f22

Browse files
committed
Add WIDTH_BUCKET timestamp support and CalcitePlanContext fields
Signed-off-by: Kai Huang <[email protected]>
1 parent 9953a5a commit 3c31f22

File tree

4 files changed

+160
-35
lines changed

4 files changed

+160
-35
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ public class CalcitePlanContext {
4848
@Getter @Setter private boolean isResolvingSubquery = false;
4949
@Getter @Setter private boolean inCoalesceFunction = false;
5050

51+
/** Fields that are being grouped by in aggregation (for bin operations to preserve originals) */
52+
@Getter @Setter
53+
private java.util.Set<String> aggregationGroupByFields = new java.util.HashSet<>();
54+
55+
/** Total number of group-by fields in current aggregation */
56+
@Getter @Setter private int aggregationGroupByCount = 0;
57+
5158
/**
5259
* The flag used to determine whether we do metadata field projection for user 1. If a project is
5360
* never visited, we will do metadata field projection for user 2. Else not because user may

core/src/main/java/org/opensearch/sql/calcite/utils/binning/handlers/CountBinHandler.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,17 @@ public RexNode createExpression(
3737
requestedBins = BinConstants.DEFAULT_BINS;
3838
}
3939

40-
// Calculate data range using window functions
41-
RexNode minValue = context.relBuilder.min(fieldExpr).over().toRex();
40+
// Calculate MIN and MAX using window functions
4241
RexNode maxValue = context.relBuilder.max(fieldExpr).over().toRex();
43-
RexNode dataRange = context.relBuilder.call(SqlStdOperatorTable.MINUS, maxValue, minValue);
44-
45-
// Convert start/end parameters
46-
RexNode startValue = convertParameter(countBin.getStart(), context);
47-
RexNode endValue = convertParameter(countBin.getEnd(), context);
42+
RexNode minValue = context.relBuilder.min(fieldExpr).over().toRex();
4843

49-
// WIDTH_BUCKET(field_value, num_bins, data_range, max_value)
44+
// WIDTH_BUCKET(field_value, num_bins, min_value, max_value)
45+
// Note: We pass minValue instead of dataRange - WIDTH_BUCKET will calculate the range
46+
// internally
5047
RexNode numBins = context.relBuilder.literal(requestedBins);
5148

5249
return context.rexBuilder.makeCall(
53-
PPLBuiltinOperators.WIDTH_BUCKET, fieldExpr, numBins, dataRange, maxValue);
50+
PPLBuiltinOperators.WIDTH_BUCKET, fieldExpr, numBins, minValue, maxValue);
5451
}
5552

5653
private RexNode convertParameter(

core/src/main/java/org/opensearch/sql/expression/function/udf/binning/WidthBucketFunction.java

Lines changed: 147 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
package org.opensearch.sql.expression.function.udf.binning;
77

8+
import java.time.Instant;
9+
import java.time.ZoneOffset;
10+
import java.time.ZonedDateTime;
11+
import java.time.format.DateTimeFormatter;
812
import java.util.List;
913
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
1014
import org.apache.calcite.adapter.enumerable.NullPolicy;
@@ -24,21 +28,22 @@
2428
import org.opensearch.sql.expression.function.UDFOperandMetadata;
2529

2630
/**
27-
* WIDTH_BUCKET(field_value, num_bins, data_range, max_value) - Histogram bucketing function.
31+
* WIDTH_BUCKET(field_value, num_bins, min_value, max_value) - Histogram bucketing function.
2832
*
2933
* <p>This function creates equal-width bins for histogram operations. It uses a mathematical O(1)
3034
* algorithm to determine optimal bin widths based on powers of 10.
3135
*
3236
* <p>Parameters:
3337
*
3438
* <ul>
35-
* <li>field_value - The numeric value to bin
39+
* <li>field_value - The numeric or timestamp value to bin
3640
* <li>num_bins - Number of bins to create
37-
* <li>data_range - Range of the data (MAX - MIN)
41+
* <li>min_value - Minimum value in the dataset
3842
* <li>max_value - Maximum value in the dataset
3943
* </ul>
4044
*
41-
* <p>Implements the same binning logic as BinCalculatorFunction for 'bins' type.
45+
* <p>Supports both numeric and timestamp fields. For timestamps, uses auto_date_histogram interval
46+
* selection.
4247
*/
4348
public class WidthBucketFunction extends ImplementorUDF {
4449

@@ -76,35 +81,76 @@ public Expression implement(
7681
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
7782
Expression fieldValue = translatedOperands.get(0);
7883
Expression numBins = translatedOperands.get(1);
79-
Expression dataRange = translatedOperands.get(2);
84+
Expression minValue = translatedOperands.get(2);
8085
Expression maxValue = translatedOperands.get(3);
8186

87+
// Pass the field type information to help detect timestamps
88+
RelDataType fieldType = call.getOperands().get(0).getType();
89+
boolean isTimestampField = dateRelatedType(fieldType);
90+
Expression isTimestamp = Expressions.constant(isTimestampField);
91+
92+
// For timestamp fields, keep as-is (don't convert to Number)
93+
// For numeric fields, convert to Number
94+
Expression fieldValueExpr =
95+
isTimestampField ? fieldValue : Expressions.convert_(fieldValue, Number.class);
96+
Expression minValueExpr =
97+
isTimestampField ? minValue : Expressions.convert_(minValue, Number.class);
98+
Expression maxValueExpr =
99+
isTimestampField ? maxValue : Expressions.convert_(maxValue, Number.class);
100+
82101
return Expressions.call(
83102
WidthBucketImplementor.class,
84103
"calculateWidthBucket",
85-
Expressions.convert_(fieldValue, Number.class),
104+
fieldValueExpr,
86105
Expressions.convert_(numBins, Number.class),
87-
Expressions.convert_(dataRange, Number.class),
88-
Expressions.convert_(maxValue, Number.class));
106+
minValueExpr,
107+
maxValueExpr,
108+
isTimestamp);
89109
}
90110

91111
/** Width bucket calculation using nice number algorithm. */
92112
public static String calculateWidthBucket(
93-
Number fieldValue, Number numBinsParam, Number dataRange, Number maxValue) {
94-
if (fieldValue == null || numBinsParam == null || dataRange == null || maxValue == null) {
113+
Object fieldValue,
114+
Number numBinsParam,
115+
Object minValue,
116+
Object maxValue,
117+
boolean isTimestamp) {
118+
if (fieldValue == null || numBinsParam == null || minValue == null || maxValue == null) {
95119
return null;
96120
}
97121

98-
double value = fieldValue.doubleValue();
99122
int numBins = numBinsParam.intValue();
100-
101123
if (numBins < BinConstants.MIN_BINS || numBins > BinConstants.MAX_BINS) {
102124
return null;
103125
}
104126

105-
double range = dataRange.doubleValue();
106-
double max = maxValue.doubleValue();
127+
// Handle timestamp fields differently
128+
if (isTimestamp) {
129+
// Convert all timestamp values to milliseconds
130+
long fieldMillis = convertTimestampToMillis(fieldValue);
131+
long minMillis = convertTimestampToMillis(minValue);
132+
long maxMillis = convertTimestampToMillis(maxValue);
133+
134+
// Calculate range
135+
long rangeMillis = maxMillis - minMillis;
136+
if (rangeMillis <= 0) {
137+
return null;
138+
}
139+
140+
return calculateTimestampBucket(fieldMillis, numBins, rangeMillis, minMillis);
141+
}
142+
143+
// Numeric field handling (existing logic)
144+
Number numericValue = (Number) fieldValue;
145+
Number numericMin = (Number) minValue;
146+
Number numericMax = (Number) maxValue;
147+
148+
double value = numericValue.doubleValue();
149+
double min = numericMin.doubleValue();
150+
double max = numericMax.doubleValue();
107151

152+
// Calculate range
153+
double range = max - min;
108154
if (range <= 0) {
109155
return null;
110156
}
@@ -190,5 +236,92 @@ private static int getAppropriateDecimalPlaces(double span) {
190236
return 4;
191237
}
192238
}
239+
240+
/**
241+
* Convert timestamp value to milliseconds. Handles both numeric (Long) milliseconds and String
242+
* formatted timestamps.
243+
*/
244+
private static long convertTimestampToMillis(Object timestamp) {
245+
if (timestamp instanceof Number) {
246+
return ((Number) timestamp).longValue();
247+
} else if (timestamp instanceof String) {
248+
// Parse timestamp string "yyyy-MM-dd HH:mm:ss" to milliseconds
249+
// Use LocalDateTime to parse without timezone, then convert to UTC
250+
String timestampStr = (String) timestamp;
251+
java.time.LocalDateTime localDateTime =
252+
java.time.LocalDateTime.parse(
253+
timestampStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
254+
// Assume the timestamp is in UTC and convert to epoch millis
255+
return localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
256+
} else {
257+
throw new IllegalArgumentException("Unsupported timestamp type: " + timestamp.getClass());
258+
}
259+
}
260+
261+
/**
262+
* Calculate timestamp bucket using auto_date_histogram interval selection. Timestamps are in
263+
* milliseconds since epoch. Bins are aligned to the minimum timestamp, not to calendar
264+
* boundaries.
265+
*/
266+
private static String calculateTimestampBucket(
267+
long timestampMillis, int numBins, long rangeMillis, long minMillis) {
268+
// Calculate target width in milliseconds
269+
long targetWidthMillis = rangeMillis / numBins;
270+
271+
// Select appropriate time interval (same as OpenSearch auto_date_histogram)
272+
long intervalMillis = selectTimeInterval(targetWidthMillis);
273+
274+
// Floor timestamp to the interval boundary aligned with minMillis
275+
// This ensures bins start at the data's minimum value, like OpenSearch auto_date_histogram
276+
long offsetFromMin = timestampMillis - minMillis;
277+
long intervalsSinceMin = offsetFromMin / intervalMillis;
278+
long binStartMillis = minMillis + (intervalsSinceMin * intervalMillis);
279+
280+
// Format as ISO 8601 timestamp string
281+
return formatTimestamp(binStartMillis);
282+
}
283+
284+
/**
285+
* Select the appropriate time interval based on target width. Uses the same intervals as
286+
* OpenSearch auto_date_histogram: 1s, 5s, 10s, 30s, 1m, 5m, 10m, 30m, 1h, 3h, 12h, 1d, 7d, 1M,
287+
* 1y
288+
*/
289+
private static long selectTimeInterval(long targetWidthMillis) {
290+
// Define nice time intervals in milliseconds
291+
long[] intervals = {
292+
1000L, // 1 second
293+
5000L, // 5 seconds
294+
10000L, // 10 seconds
295+
30000L, // 30 seconds
296+
60000L, // 1 minute
297+
300000L, // 5 minutes
298+
600000L, // 10 minutes
299+
1800000L, // 30 minutes
300+
3600000L, // 1 hour
301+
10800000L, // 3 hours
302+
43200000L, // 12 hours
303+
86400000L, // 1 day
304+
604800000L, // 7 days
305+
2592000000L, // 30 days (approximate month)
306+
31536000000L // 365 days (approximate year)
307+
};
308+
309+
// Find the smallest interval that is >= target width
310+
for (long interval : intervals) {
311+
if (interval >= targetWidthMillis) {
312+
return interval;
313+
}
314+
}
315+
316+
// If target is larger than all intervals, use the largest
317+
return intervals[intervals.length - 1];
318+
}
319+
320+
/** Format timestamp in milliseconds as ISO 8601 string. Format: "yyyy-MM-dd HH:mm:ss" */
321+
private static String formatTimestamp(long timestampMillis) {
322+
Instant instant = Instant.ofEpochMilli(timestampMillis);
323+
ZonedDateTime zdt = instant.atZone(ZoneOffset.UTC);
324+
return zdt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
325+
}
193326
}
194327
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,9 +1003,6 @@ public void testBinFloatingPointSpanWithStatsCount() throws IOException {
10031003

10041004
@Test
10051005
public void testStatsWithBinsOnTimeField_Count() throws IOException {
1006-
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
1007-
enabledOnlyWhenPushdownIsEnabled();
1008-
10091006
JSONObject result =
10101007
executeQuery("source=events_null | bin @timestamp bins=3 | stats count() by @timestamp");
10111008
verifySchema(
@@ -1041,9 +1038,6 @@ public void testStatsWithBinsOnTimeField_Count() throws IOException {
10411038

10421039
@Test
10431040
public void testStatsWithBinsOnTimeField_Avg() throws IOException {
1044-
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
1045-
enabledOnlyWhenPushdownIsEnabled();
1046-
10471041
JSONObject result =
10481042
executeQuery(
10491043
"source=events_null | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp");
@@ -1082,9 +1076,6 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException {
10821076

10831077
@Test
10841078
public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException {
1085-
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
1086-
enabledOnlyWhenPushdownIsEnabled();
1087-
10881079
JSONObject result =
10891080
executeQuery(
10901081
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by"
@@ -1105,9 +1096,6 @@ public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException {
11051096

11061097
@Test
11071098
public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException {
1108-
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
1109-
enabledOnlyWhenPushdownIsEnabled();
1110-
11111099
JSONObject result =
11121100
executeQuery(
11131101
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false "

0 commit comments

Comments
 (0)