-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Connector-V2] Kafka Source support to read TIMESTAMP_TZ #9984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
|
Hi @hohosznta , |
|
Let's merge the latest changes and try again. |
|
@dybyte thanks, I'll try again. |
|
@dybyte Hi, all CI/CD checks have passed. I’d really appreciate it if you could take a look at my PR. |
| case TIMESTAMP_TZ: | ||
| DateTimeFormatter dateTimeTzFormatter = fieldFormatterMap.get(fieldName); | ||
| if (dateTimeTzFormatter == null) { | ||
| dateTimeTzFormatter = DateTimeUtils.matchDateTimeFormatter(field); | ||
| fieldFormatterMap.put(fieldName, dateTimeTzFormatter); | ||
| } | ||
| if (dateTimeTzFormatter == null) { | ||
| throw CommonError.formatDateTimeError(field, fieldName); | ||
| } | ||
|
|
||
| TemporalAccessor parsedTimestampTz = dateTimeTzFormatter.parse(field); | ||
| LocalTime localTimeTz = parsedTimestampTz.query(TemporalQueries.localTime()); | ||
| LocalDate localDateTz = parsedTimestampTz.query(TemporalQueries.localDate()); | ||
| ZoneOffset offset = parsedTimestampTz.query(TemporalQueries.offset()); | ||
|
|
||
| if (offset == null) { | ||
| offset = ZoneId.systemDefault().getRules().getOffset(Instant.now()); | ||
| } | ||
|
|
||
| if (localDateTz == null || localTimeTz == null) { | ||
| throw CommonError.formatDateTimeError(field, fieldName); | ||
| } | ||
|
|
||
| return OffsetDateTime.of(localDateTz, localTimeTz, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test case for this change.
Purpose of this pull request
Add support for TIMESTAMP_TZ on kafka source. Issue #9785
Does this PR introduce any user-facing change?
Yes. The Kafka source connector now supports TIMESTAMP_TZ (timestamp with timezone) data type.
Users can now consume Kafka messages containing timestamp with timezone information (e.g., "2024-01-15T10:30:45Z",
"2024-01-15T10:30:45+09:00") and SeaTunnel will correctly parse them as OffsetDateTime.
How was this patch tested?
Added unit test case in json parsing timestamp_tz in JsonRowDataSerDeSchemaTest.java. (testDeserializationWithTimestampTz)
Check list
New License Guide