Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ public class DateTimeUtils {
public static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS_14_FORMATTER =
DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_NO_SPLIT.value);

// offset datetime formatter map
public static final Map<Pattern, DateTimeFormatter> OFFSET_DATETIME_FORMATTER_MAP =
new LinkedHashMap<>();

public static Set<Map.Entry<Pattern, DateTimeFormatter>>
OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>();

static {
YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put(
Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"),
Expand Down Expand Up @@ -212,6 +219,42 @@ public class DateTimeUtils {

YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET.addAll(
YYYY_M_D_HH_MM_15_FORMATTER_MAP.entrySet());

OFFSET_DATETIME_FORMATTER_MAP.put(
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z"),
DateTimeFormatter.ISO_OFFSET_DATE_TIME);

OFFSET_DATETIME_FORMATTER_MAP.put(
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[+-]\\d{2}:\\d{2}"),
DateTimeFormatter.ISO_OFFSET_DATE_TIME);

OFFSET_DATETIME_FORMATTER_MAP.put(
Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}Z"),
DateTimeFormatter.ISO_OFFSET_DATE_TIME);

OFFSET_DATETIME_FORMATTER_MAP.put(
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}[+-]\\d{2}:\\d{2}"),
DateTimeFormatter.ISO_OFFSET_DATE_TIME);

OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET.addAll(OFFSET_DATETIME_FORMATTER_MAP.entrySet());
}

/**
* gave an offset datetime string and return the {@link DateTimeFormatter} which can be used to
* parse it.
*
* @param dateTime eg: 2020-02-03T12:12:10Z or 2020-02-03T12:12:10+09:00
* @return the DateTimeFormatter matched, will return null when not matched any pattern
*/
public static DateTimeFormatter matchOffsetDateTimeFormatter(String dateTime) {
for (Map.Entry<Pattern, DateTimeFormatter> entry :
OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET) {
if (entry.getKey().matcher(dateTime).matches()) {
return entry.getValue();
}
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
case DATE:
case DECIMAL:
case TIMESTAMP:
case TIMESTAMP_TZ:
return val;
case BYTES:
return ((ByteBuffer) val).array();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
Expand Down Expand Up @@ -155,6 +159,13 @@ public Object convert(JsonNode jsonNode, String fieldName) {
return convertToLocalDateTime(jsonNode, fieldName);
}
};
case TIMESTAMP_TZ:
return new JsonToObjectConverter() {
@Override
public Object convert(JsonNode jsonNode, String fieldName) {
return convertToOffsetDateTime(jsonNode, fieldName);
}
};
case FLOAT:
return new JsonToObjectConverter() {
@Override
Expand Down Expand Up @@ -284,6 +295,35 @@ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName
return LocalDateTime.of(localDate, localTime);
}

private OffsetDateTime convertToOffsetDateTime(JsonNode jsonNode, String fieldName) {
String datetimeStr = jsonNode.asText();
DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName);

if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeUtils.matchOffsetDateTimeFormatter(datetimeStr);
fieldFormatterMap.put(fieldName, dateTimeFormatter);
}

if (dateTimeFormatter == null) {
throw CommonError.formatDateTimeError(datetimeStr, fieldName);
}

TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(datetimeStr);
LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
ZoneOffset offset = parsedTimestamp.query(TemporalQueries.offset());

if (offset == null) {
offset = ZoneId.systemDefault().getRules().getOffset(Instant.now());
}

if (localDate == null || localTime == null) {
throw CommonError.formatDateTimeError(datetimeStr, fieldName);
}

return OffsetDateTime.of(localDate, localTime, offset);
}

private String convertToString(JsonNode jsonNode) {
if (jsonNode.isContainerNode()) {
return jsonNode.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalQueries;
import java.util.HashMap;
Expand Down Expand Up @@ -694,4 +696,35 @@ public void testSerializationWithNumber() {
String expected = "{\"id\":1,\"code\":\"1001015\",\"fe_result\":80}";
assertEquals(new String(serialize), expected);
}

@Test
public void testDeserializationWithTimestampTz() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {"timestamp_tz"},
new SeaTunnelDataType[] {LocalTimeType.OFFSET_DATE_TIME_TYPE});
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);

OffsetDateTime timestampUtc = OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 0, ZoneOffset.UTC);
SeaTunnelRow rowUtc =
deserializationSchema.deserialize(
"{\"timestamp_tz\":\"2024-01-15T10:30:45Z\"}".getBytes());
assertEquals(timestampUtc, rowUtc.getField(0));

OffsetDateTime timestampKst =
OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 0, ZoneOffset.ofHours(9));
SeaTunnelRow rowKst =
deserializationSchema.deserialize(
"{\"timestamp_tz\":\"2024-01-15T10:30:45+09:00\"}".getBytes());
assertEquals(timestampKst, rowKst.getField(0));

OffsetDateTime timestampMillis =
OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 123000000, ZoneOffset.UTC);
SeaTunnelRow rowMillis =
deserializationSchema.deserialize(
"{\"timestamp_tz\":\"2024-01-15T10:30:45.123Z\"}".getBytes());
assertEquals(timestampMillis, rowMillis.getField(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private Object convertField(
case DATE:
case DECIMAL:
case TIMESTAMP:
case TIMESTAMP_TZ:
return val;
case BYTES:
return ((ByteString) val).toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
Expand Down Expand Up @@ -257,6 +261,8 @@ private Object convert(
return objectArrayList.toArray(new LocalTime[0]);
case TIMESTAMP:
return objectArrayList.toArray(new LocalDateTime[0]);
case TIMESTAMP_TZ:
return objectArrayList.toArray(new OffsetDateTime[0]);
default:
throw new SeaTunnelTextFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
Expand Down Expand Up @@ -330,6 +336,30 @@ private Object convert(
LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
return LocalDateTime.of(localDate, localTime);
case TIMESTAMP_TZ:
DateTimeFormatter dateTimeTzFormatter = fieldFormatterMap.get(fieldName);
if (dateTimeTzFormatter == null) {
dateTimeTzFormatter = DateTimeUtils.matchDateTimeFormatter(field);
fieldFormatterMap.put(fieldName, dateTimeTzFormatter);
}
if (dateTimeTzFormatter == null) {
throw CommonError.formatDateTimeError(field, fieldName);
}

TemporalAccessor parsedTimestampTz = dateTimeTzFormatter.parse(field);
LocalTime localTimeTz = parsedTimestampTz.query(TemporalQueries.localTime());
LocalDate localDateTz = parsedTimestampTz.query(TemporalQueries.localDate());
ZoneOffset offset = parsedTimestampTz.query(TemporalQueries.offset());

if (offset == null) {
offset = ZoneId.systemDefault().getRules().getOffset(Instant.now());
}

if (localDateTz == null || localTimeTz == null) {
throw CommonError.formatDateTimeError(field, fieldName);
}

return OffsetDateTime.of(localDateTz, localTimeTz, offset);
Comment on lines +339 to +362
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case for this change.

case ROW:
Map<Integer, String> splitsMap =
splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1);
Expand Down