Skip to content

Commit c58dd7c

Browse files
committed
add support for portable logical types
1 parent 57e34b6 commit c58dd7c

1 file changed

Lines changed: 113 additions & 0 deletions

File tree

  • sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
5757
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
5858
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
59+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
60+
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
5961
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
6062
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
6163
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
@@ -415,6 +417,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
415417
String identifier = logicalType.getIdentifier();
416418
if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
417419
return Instant.ofEpochMilli(((Number) value).longValue());
420+
} else if (MicrosInstant.IDENTIFIER.equals(identifier)
421+
|| NanosInstant.IDENTIFIER.equals(identifier)) {
422+
// Portable instant logical types: treat calcite numeric as epoch milliseconds.
423+
return Instant.ofEpochMilli(((Number) value).longValue());
418424
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
419425
if (value instanceof Date) {
420426
value = SqlFunctions.toInt((Date) value);
@@ -573,6 +579,17 @@ private static Expression getBeamField(
573579
value = Expressions.call(expression, "getBytes", fieldName);
574580
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
575581
value = Expressions.call(expression, "getDateTime", fieldName);
582+
} else if (MicrosInstant.IDENTIFIER.equals(identifier)
583+
|| NanosInstant.IDENTIFIER.equals(identifier)) {
584+
// Instant-like logical types: retrieve as Instant
585+
value =
586+
Expressions.convert_(
587+
Expressions.call(
588+
expression,
589+
"getLogicalTypeValue",
590+
fieldName,
591+
Expressions.constant(java.time.Instant.class)),
592+
java.time.Instant.class);
576593
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
577594
value =
578595
Expressions.convert_(
@@ -600,6 +617,59 @@ private static Expression getBeamField(
600617
fieldName,
601618
Expressions.constant(LocalDateTime.class)),
602619
LocalDateTime.class);
620+
} else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) {
621+
// For pass-through logical types, read the underlying base type using the
622+
// corresponding Row getter.
623+
FieldType baseType = fieldType.getLogicalType().getBaseType();
624+
switch (baseType.getTypeName()) {
625+
case BYTE:
626+
value = Expressions.call(expression, "getByte", fieldName);
627+
break;
628+
case INT16:
629+
value = Expressions.call(expression, "getInt16", fieldName);
630+
break;
631+
case INT32:
632+
value = Expressions.call(expression, "getInt32", fieldName);
633+
break;
634+
case INT64:
635+
value = Expressions.call(expression, "getInt64", fieldName);
636+
break;
637+
case DECIMAL:
638+
value = Expressions.call(expression, "getDecimal", fieldName);
639+
break;
640+
case FLOAT:
641+
value = Expressions.call(expression, "getFloat", fieldName);
642+
break;
643+
case DOUBLE:
644+
value = Expressions.call(expression, "getDouble", fieldName);
645+
break;
646+
case STRING:
647+
value = Expressions.call(expression, "getString", fieldName);
648+
break;
649+
case DATETIME:
650+
value = Expressions.call(expression, "getDateTime", fieldName);
651+
break;
652+
case BOOLEAN:
653+
value = Expressions.call(expression, "getBoolean", fieldName);
654+
break;
655+
case BYTES:
656+
value = Expressions.call(expression, "getBytes", fieldName);
657+
break;
658+
case ARRAY:
659+
value = Expressions.call(expression, "getArray", fieldName);
660+
break;
661+
case MAP:
662+
value = Expressions.call(expression, "getMap", fieldName);
663+
break;
664+
case ROW:
665+
value = Expressions.call(expression, "getRow", fieldName);
666+
break;
667+
case ITERABLE:
668+
value = Expressions.call(expression, "getIterable", fieldName);
669+
break;
670+
default:
671+
throw new UnsupportedOperationException("Unable to get logical type " + identifier);
672+
}
603673
} else {
604674
throw new UnsupportedOperationException("Unable to get logical type " + identifier);
605675
}
@@ -658,6 +728,11 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
658728
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
659729
return nullOr(
660730
value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis"));
731+
} else if (MicrosInstant.IDENTIFIER.equals(identifier)
732+
|| NanosInstant.IDENTIFIER.equals(identifier)) {
733+
// Convert java.time.Instant to epoch milliseconds for Calcite
734+
return nullOr(
735+
value, Expressions.call(Expressions.convert_(value, java.time.Instant.class), "toEpochMilli"));
661736
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
662737
return nullOr(
663738
value,
@@ -687,6 +762,44 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
687762
Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)),
688763
Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND)));
689764
return nullOr(value, returnValue);
765+
} else if (fieldType.getLogicalType() instanceof PassThroughLogicalType) {
766+
// For pass-through logical types, convert underlying base type to Calcite value
767+
FieldType baseType = fieldType.getLogicalType().getBaseType();
768+
switch (baseType.getTypeName()) {
769+
case BYTE:
770+
return Expressions.convert_(value, Byte.class);
771+
case INT16:
772+
return Expressions.convert_(value, Short.class);
773+
case INT32:
774+
return Expressions.convert_(value, Integer.class);
775+
case INT64:
776+
return Expressions.convert_(value, Long.class);
777+
case DECIMAL:
778+
return Expressions.convert_(value, BigDecimal.class);
779+
case FLOAT:
780+
return Expressions.convert_(value, Float.class);
781+
case DOUBLE:
782+
return Expressions.convert_(value, Double.class);
783+
case STRING:
784+
return Expressions.convert_(value, String.class);
785+
case BOOLEAN:
786+
return Expressions.convert_(value, Boolean.class);
787+
case DATETIME:
788+
return nullOr(
789+
value,
790+
Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis"));
791+
case BYTES:
792+
return nullOr(
793+
value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class)));
794+
case ARRAY:
795+
return nullOr(value, toCalciteList(value, baseType.getCollectionElementType()));
796+
case MAP:
797+
return nullOr(value, toCalciteMap(value, baseType.getMapValueType()));
798+
case ROW:
799+
return nullOr(value, toCalciteRow(value, baseType.getRowSchema()));
800+
default:
801+
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
802+
}
690803
} else {
691804
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
692805
}

0 commit comments

Comments
 (0)