Skip to content

[Feature]Hope to support JSON multi-level nested parsing! #9973

@KONEONE

Description

@KONEONE

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

In Kafka, when parsing JSON strings, for example:

{“personInfo”:{“name”:“kk”,“age”:12}, “other”:12}
{ “other”:12}

It can be seen that the first line has the "personInfo" field, while the second line does not. However, the schema definition of the Kafka source is as follows:

personInfo = {
  name = string,
  age = int
}
other = int

The above definition is fine. But when performing SQL transformation in the transform step, the second line will cause a problem. For example, if the query is:

transform {
    Sql {
            plugin_input = "kafka_table"
            plugin_output = "out_table"
            query = """
              select personInfo.name as personInfo_name from kafka_table
            """
        }
}

The above code will throw an exception. The exception is as follows:

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.transform.exception.TransformException: ErrorCode:[TRANSFORM_COMMON-06], ErrorDescription:[The expression 'personInfo.name' of SQL transform execute failed]
        at org.apache.seatunnel.transform.exception.TransformCommonError.sqlExpressionError(TransformCommonError.java:79)

The specific reason is that "personInfo" is null in the second line of data. In this scenario, an exception should not be thrown; instead, null should be used for filling. It is hoped that SeaTunnel can add this nested parsing functionality.


在kafka中,解析json字符串,比如:

{“personInfo”:{"name":"kk","age":12}, "other":12}
{ "other":12}

可以看出第1行有personInfo这个字段,第2行没有。但是在kafka的source的schema定义如下:

personInfo = {
  name = string,
  age = int
}
other = int

上面的定义没有什么问题。但是在transform的Sql转换的时候第二行却会出现问题, 比如query为

transform {
    Sql {
            plugin_input = "kafka_table"
            plugin_output = "out_table"
            query = """
              select personInfo.name as personInfo_name from kafka_table
            """
        }
}

上面的代码会抛出异常。异常如下:

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.transform.exception.TransformException: ErrorCode:[TRANSFORM_COMMON-06], ErrorDescription:[The expression 'personInfo.name' of SQL transform execute failed]
        at org.apache.seatunnel.transform.exception.TransformCommonError.sqlExpressionError(TransformCommonError.java:79)

具体的原因是第二行数据中,personInfo 是 null, 在这个场景中,其实不应该抛出异常,而应该使用null进行填充即可。希望seatunnel能够加上这个嵌套解析的功能。

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

Doing

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions