From 17c6ad813d5291b003839b3779d894957a23a6de Mon Sep 17 00:00:00 2001 From: Zhenqiu Huang Date: Sat, 30 Nov 2024 09:44:08 -0800 Subject: [PATCH] [FLINK-34467] add lineage integration for jdbc connector --- .github/workflows/backwards_compatibility.yml | 2 +- .github/workflows/weekly.yml | 6 +- .java-version | 1 + .../4bca2274-65a9-4a61-81ef-767d58233ea0 | 1 + .../6cdea252-f400-4c13-bc99-b325f2ebe333 | 4 +- .../d45c3af5-52c6-45fd-9926-75e75e77473a | 4 + flink-connector-jdbc-core/pom.xml | 5 + .../flink/connector/jdbc/JdbcInputFormat.java | 23 ++- .../jdbc/core/datastream/sink/JdbcSink.java | 18 ++- .../core/datastream/source/JdbcSource.java | 26 +++- .../table/source/JdbcRowDataInputFormat.java | 23 ++- .../source/JdbcRowDataLookupFunction.java | 39 ++++- .../SimpleJdbcConnectionProvider.java | 4 + .../internal/GenericJdbcSinkFunction.java | 17 ++- .../jdbc/lineage/DefaultJdbcExtractor.java | 97 ++++++++++++ .../jdbc/lineage/DefaultTypeDatasetFacet.java | 61 ++++++++ .../connector/jdbc/lineage/JdbcLocation.java | 118 +++++++++++++++ .../jdbc/lineage/JdbcLocationExtractor.java | 32 ++++ .../lineage/JdbcLocationExtractorFactory.java | 36 +++++ .../connector/jdbc/lineage/JdbcUtils.java | 85 +++++++++++ .../connector/jdbc/lineage/LineageUtils.java | 126 ++++++++++++++++ .../OverrideJdbcLocationExtractor.java | 80 ++++++++++ .../jdbc/lineage/TypeDatasetFacet.java | 29 ++++ .../lineage/DB2LocationExtractor.java | 50 +++++++ .../lineage/DB2LocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/MySqlLocationExtractor.java | 56 +++++++ .../MysqlLocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/OceanBaseLocationExtractor.java | 56 +++++++ .../OceanBaseLocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/OracleLocationExtractor.java | 89 +++++++++++ .../OracleLocationExtractorFactory.java | 32 ++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/PostgresLocationExtractor.java | 51 +++++++ .../PostgresLocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/SqlServerLocationExtractor.java | 138 ++++++++++++++++++ .../SqlServerLocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../lineage/TrinoLocationExtractor.java | 50 +++++++ .../TrinoLocationExtractorFactory.java | 33 +++++ ....jdbc.lineage.JdbcLocationExtractorFactory | 16 ++ .../connector/jdbc/xa/JdbcXaSinkFunction.java | 10 +- pom.xml | 6 + 46 files changed, 1665 insertions(+), 20 deletions(-) create mode 100644 .java-version create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultJdbcExtractor.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractor.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/OverrideJdbcLocationExtractor.java create mode 100644 flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java create mode 100644 flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractor.java create mode 100644 flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractorFactory.java create mode 100644 flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractor.java create mode 100644 flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MysqlLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractor.java create mode 100644 flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractor.java create mode 100644 flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractor.java create mode 100644 flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractor.java create mode 100644 flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory create mode 100644 flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractor.java create mode 100644 flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java create mode 100644 flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory diff --git a/.github/workflows/backwards_compatibility.yml b/.github/workflows/backwards_compatibility.yml index 139b3d9e..809c8f72 100644 --- a/.github/workflows/backwards_compatibility.yml +++ b/.github/workflows/backwards_compatibility.yml @@ -29,7 +29,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT] + flink: [1.19-SNAPSHOT] jdk: [8, 11, 17] env: diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 9a27cdad..576edd5f 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -29,11 +29,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: - flink_branches: [{ - flink: 1.19-SNAPSHOT, - jdk: '8, 11, 17, 21', - branch: main - }, + flink_branches: [ { flink: 1.20-SNAPSHOT, jdk: '8, 11, 17, 21', diff --git a/.java-version b/.java-version new file mode 100644 index 00000000..b4de3947 --- /dev/null +++ b/.java-version @@ -0,0 +1 @@ +11 diff --git a/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 index e69de29b..3f29182c 100644 --- a/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 +++ b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 @@ -0,0 +1 @@ +Method calls method in (JdbcSource.java:215) diff --git a/flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333 b/flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333 index afc72434..f44ff4a1 100644 --- a/flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333 +++ b/flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333 @@ -48,9 +48,11 @@ Method has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (JdbcOutputFormatBuilder.java:0) Method has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (JdbcOutputFormatBuilder.java:0) Method has return type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0) -Method calls constructor ([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:287) +Method calls constructor ([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:295) Method has parameter of type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0) Method is annotated with in (JdbcRowDataLookupFunction.java:0) +Method calls method in (JdbcRowDataLookupFunction.java:242) +Method calls method in (JdbcRowDataLookupFunction.java:243) Method is annotated with in (SimpleXaConnectionProvider.java:0) Method calls constructor ([B)> in (TransactionId.java:96) Method calls method in (TransactionId.java:101) diff --git a/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a index e69de29b..c248851c 100644 --- a/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a +++ b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a @@ -0,0 +1,4 @@ +org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated diff --git a/flink-connector-jdbc-core/pom.xml b/flink-connector-jdbc-core/pom.xml index 851c1e07..23afd29b 100644 --- a/flink-connector-jdbc-core/pom.xml +++ b/flink-connector-jdbc-core/pom.xml @@ -56,6 +56,11 @@ under the License. true + + io.openlineage + openlineage-sql-java + + diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java index dacf3c6c..2739e974 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; @@ -31,10 +32,15 @@ import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -53,6 +59,8 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; /** * InputFormat to read data from a database and generate Rows. The InputFormat has to be configured @@ -107,7 +115,7 @@ @Deprecated @Experimental public class JdbcInputFormat extends RichInputFormat - implements ResultTypeQueryable { + implements LineageVertexProvider, ResultTypeQueryable { protected static final long serialVersionUID = 2L; protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class); @@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() { return new JdbcInputFormatBuilder(); } + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getProducedType()); + Optional nameOpt = LineageUtils.nameOf(queryTemplate); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } + /** Builder for {@link JdbcInputFormat}. */ public static class JdbcInputFormatBuilder { private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java index 8c0cf999..af7d5b14 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java @@ -34,11 +34,16 @@ import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Optional; /** * Flink Sink to produce data into a jdbc database. @@ -47,7 +52,9 @@ */ @PublicEvolving public class JdbcSink - implements StatefulSink, TwoPhaseCommittingSink { + implements LineageVertexProvider, + StatefulSink, + TwoPhaseCommittingSink { private final DeliveryGuarantee deliveryGuarantee; private final JdbcConnectionProvider connectionProvider; @@ -113,4 +120,13 @@ public SimpleVersionedSerializer getCommittableSerializer() { public SimpleVersionedSerializer getWriterStateSerializer() { return new JdbcWriterStateSerializer(); } + + @Override + public LineageVertex getLineageVertex() { + Optional nameOpt = LineageUtils.nameOf(queryStatement.query()); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList()); + return LineageUtils.lineageVertexOf(Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index eef58c07..678474a5 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -34,26 +34,36 @@ import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Objects; +import java.util.Optional; /** JDBC source. */ @PublicEvolving public class JdbcSource - implements Source, + implements LineageVertexProvider, + Source, ResultTypeQueryable { private final Boundedness boundedness; @@ -195,4 +205,18 @@ public boolean equals(Object o) { && deliveryGuarantee == that.deliveryGuarantee && Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings); } + + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getTypeInformation()); + SqlTemplateSplitEnumerator enumerator = + (SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create(); + Optional nameOpt = LineageUtils.nameOf(enumerator.getSqlTemplate()); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java index b06f555d..b7f670ee 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java @@ -24,16 +24,22 @@ import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -51,11 +57,13 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; /** InputFormat for {@link JdbcDynamicTableSource}. */ @Internal public class JdbcRowDataInputFormat extends RichInputFormat - implements ResultTypeQueryable { + implements LineageVertexProvider, ResultTypeQueryable { private static final long serialVersionUID = 2L; private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class); @@ -296,6 +304,19 @@ public static Builder builder() { return new Builder(); } + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getProducedType()); + Optional nameOpt = LineageUtils.nameOf(queryTemplate); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } + /** Builder for {@link JdbcRowDataInputFormat}. */ public static class Builder { private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java index dd39fa27..adc0fa2c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java @@ -20,18 +20,26 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.table.types.utils.TypeConversions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +54,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -53,7 +62,7 @@ /** A lookup function for {@link JdbcDynamicTableSource}. */ @Internal -public class JdbcRowDataLookupFunction extends LookupFunction { +public class JdbcRowDataLookupFunction extends LookupFunction implements LineageVertexProvider { private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final long serialVersionUID = 2L; @@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final List resolvedPredicates; private final Serializable[] pushdownParams; + private final RowType producedType; private transient FieldNamedPreparedStatement statement; @@ -106,12 +116,12 @@ public JdbcRowDataLookupFunction( .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); JdbcDialect jdbcDialect = options.getDialect(); this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType); - this.lookupKeyRowConverter = - jdbcDialect.getRowConverter( - RowType.of( - Arrays.stream(keyTypes) - .map(DataType::getLogicalType) - .toArray(LogicalType[]::new))); + this.producedType = + RowType.of( + Arrays.stream(keyTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new)); + this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType); this.resolvedPredicates = resolvedPredicates; this.pushdownParams = pushdownParams; } @@ -224,4 +234,19 @@ public void close() throws IOException { public Connection getDbConnection() { return connectionProvider.getConnection(); } + + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet( + LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( + TypeConversions.fromLogicalToDataType(producedType))); + Optional nameOpt = LineageUtils.nameOf(query); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java index 4c48f799..3c0c00e4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java @@ -149,4 +149,8 @@ public Connection reestablishConnection() throws SQLException, ClassNotFoundExce closeConnection(); return getOrEstablishConnection(); } + + public String getDbURL() { + return this.jdbcOptions.getDbURL(); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java index 43e08fe6..8c36ad85 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java @@ -23,20 +23,26 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.Collections; +import java.util.Optional; /** A generic SinkFunction for JDBC. */ @Internal public class GenericJdbcSinkFunction extends RichSinkFunction - implements CheckpointedFunction, InputTypeConfigurable { + implements LineageVertexProvider, CheckpointedFunction, InputTypeConfigurable { private final JdbcOutputFormat outputFormat; private JdbcOutputSerializer serializer; @@ -78,4 +84,13 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi JdbcOutputSerializer.of( ((TypeInformation) type).createSerializer(executionConfig)); } + + @Override + public LineageVertex getLineageVertex() { + Optional nameOpt = LineageUtils.nameOf(outputFormat.toString()); + String namespace = LineageUtils.namespaceOf(outputFormat.connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList()); + return LineageUtils.lineageVertexOf(Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultJdbcExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultJdbcExtractor.java new file mode 100644 index 00000000..e27dd45e --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultJdbcExtractor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Optional; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Default implementation of {@link JdbcLocationExtractor}. */ +@PublicEvolving +public class DefaultJdbcExtractor implements JdbcLocationExtractor { + + private static final Pattern URL_FORMAT = + Pattern.compile( + "^(?\\w+)://(?[\\w\\d\\.\\[\\]:,-]+)/?(?[\\w\\d.]+)?(?:\\?.*)?"); + + @Override + public boolean isDefinedAt(String jdbcUri) { + return true; + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + if (!rawUri.contains(",")) { + return extractOneHost(rawUri); + } + return extractMultipleHosts(rawUri); + } + + private JdbcLocation extractOneHost(String rawUri) throws URISyntaxException { + URI uri = new URI(rawUri); + + if (uri.getHost() == null) { + throw new URISyntaxException(rawUri, "Missing host"); + } + + String scheme = uri.getScheme(); + String host = uri.getHost(); + String authority; + if (uri.getPort() > 0) { + authority = String.format("%s:%d", host, uri.getPort()); + } else { + authority = host; + } + + Optional database = + Optional.ofNullable(uri.getPath()) + .map(db -> db.replaceFirst("/", "")) + .filter(db -> !db.isEmpty()); + return JdbcLocation.builder() + .withScheme(scheme) + .withAuthority(Optional.of(authority)) + .withDatabase(database) + .build(); + } + + private JdbcLocation extractMultipleHosts(String rawUri) throws URISyntaxException { + // new URI() parses 'scheme://host1,host2' syntax as scheme-specific part instead of + // authority + // Using regex to extract URI components + + Matcher matcher = URL_FORMAT.matcher(rawUri); + if (!matcher.matches()) { + throw new URISyntaxException(rawUri, "Failed to parse jdbc url"); + } + + String scheme = matcher.group("scheme"); + String authority = matcher.group("authority"); + String database = matcher.group("database"); + + return JdbcLocation.builder() + .withScheme(scheme) + .withAuthority(Optional.of(authority)) + .withDatabase(Optional.ofNullable(database)) + .build(); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java new file mode 100644 index 00000000..989340fd --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Objects; + +/** Default implementation of {@link TypeDatasetFacet}. */ +@PublicEvolving +public class DefaultTypeDatasetFacet implements TypeDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java new file mode 100644 index 00000000..cf224da8 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Locale; +import java.util.Optional; + +/** JDBC connection URL location. */ +@Internal +public class JdbcLocation { + private final String scheme; + private final Optional authority; + private final Optional instance; + private final Optional database; + + private JdbcLocation( + String scheme, + Optional authority, + Optional instance, + Optional database) { + this.scheme = scheme; + this.authority = authority; + this.instance = instance; + this.database = database; + } + + public String toNamespace() { + String result = scheme.toLowerCase(Locale.ROOT) + ":"; + if (authority.isPresent()) { + result = String.format("%s//%s", result, authority.get().toLowerCase(Locale.ROOT)); + } + if (instance.isPresent()) { + result = String.format("%s/%s", result, StringUtils.stripStart(instance.get(), "/")); + } + return result; + } + + public String toName(List parts) { + if (database.isPresent()) { + parts.add(0, database.get()); + } + return String.join(".", parts); + } + + public String getScheme() { + return this.scheme; + } + + public Optional getAuthority() { + return this.authority; + } + + public Optional getInstance() { + return this.instance; + } + + public Optional getDatabase() { + return this.database; + } + + public static JdbcLocation.Builder builder() { + return new JdbcLocation.Builder(); + } + + /** Builder for {@link JdbcLocation}. */ + @PublicEvolving + public static final class Builder { + private String scheme = ""; + private Optional authority = Optional.empty(); + private Optional instance = Optional.empty(); + private Optional database = Optional.empty(); + + public Builder withScheme(String scheme) { + this.scheme = scheme; + return this; + } + + public Builder withAuthority(Optional authority) { + this.authority = authority; + return this; + } + + public Builder withInstance(Optional instance) { + this.instance = instance; + return this; + } + + public Builder withDatabase(Optional database) { + this.database = database; + return this; + } + + public JdbcLocation build() { + return new JdbcLocation(scheme, authority, instance, database); + } + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractor.java new file mode 100644 index 00000000..dee5ce01 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** Interface for extracting {@link JdbcLocation}. */ +@PublicEvolving +public interface JdbcLocationExtractor { + + boolean isDefinedAt(String jdbcUri); + + JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException; +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java new file mode 100644 index 00000000..11f3a01d --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A factory to create a specific {@link JdbcLocationExtractor}. This factory is used with Java's + * Service Provider Interfaces (SPI) for discovering. + * + *

Classes that implement this interface can be added to the + * "META_INF/services/org.apache.flink.connector.jdbc.core.lineage.JdbcLocationExtractorFactory" + * file of a JAR file in the current classpath to be found. + * + * @see JdbcLocationExtractor + */ +@PublicEvolving +public interface JdbcLocationExtractorFactory { + + JdbcLocationExtractor createExtractor(); +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java new file mode 100644 index 00000000..67739497 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.ServiceLoader; + +/** Utils for JDBC url preprocess and namespace extraction. */ +public class JdbcUtils { + private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + private static final String SLASH_DELIMITER_USER_PASSWORD_REGEX = + "[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@"; + private static final String COLON_DELIMITER_USER_PASSWORD_REGEX = + "([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@"; + private static final String PARAMS_USER_PASSWORD_REGEX = + "(?i)[,;&:]?(?:user|username|password)=[^,;&:()]+[,;&:]?"; + private static final String DUPLICATED_DELIMITERS = "(\\(\\)){2,}|[,;&:]{2,}"; + private static final String QUERY_PARAMS_REGEX = "\\?.*$"; + + private static final List extractors = new ArrayList<>(); + + static { + for (JdbcLocationExtractorFactory factory : + ServiceLoader.load(JdbcLocationExtractorFactory.class)) { + extractors.add(factory.createExtractor()); + } + } + + public static String getJdbcNamespace(String jdbcUrl, Properties properties) { + String uri = jdbcUrl.replaceAll("^(?i)jdbc:", ""); + try { + JdbcLocationExtractor extractor = getExtractor(uri); + return extractor.extract(uri, properties).toNamespace(); + } catch (URISyntaxException e) { + LOG.debug("Failed to parse jdbc url", e); + return dropSensitiveData(uri); + } + } + + private static JdbcLocationExtractor getExtractor(String jdbcUrl) throws URISyntaxException { + for (JdbcLocationExtractor extractor : extractors) { + if (extractor.isDefinedAt(jdbcUrl)) { + return extractor; + } + } + + throw new URISyntaxException(jdbcUrl, "Unsupported JDBC URL"); + } + + /** + * JdbcUrl can contain username and password this method clean-up credentials from jdbcUrl. Also + * drop query params as they include a lot of useless options, like timeout + * + * @param jdbcUrl url to database + * @return String + */ + private static String dropSensitiveData(String jdbcUrl) { + return jdbcUrl.replaceAll(SLASH_DELIMITER_USER_PASSWORD_REGEX, "@") + .replaceAll(COLON_DELIMITER_USER_PASSWORD_REGEX, "$1") + .replaceAll(PARAMS_USER_PASSWORD_REGEX, "") + .replaceAll(DUPLICATED_DELIMITERS, "") + .replaceAll(QUERY_PARAMS_REGEX, ""); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java new file mode 100644 index 00000000..ab22ac2b --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; +import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import io.openlineage.sql.OpenLineageSql; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Utils for Lineage metadata extraction. */ +@PublicEvolving +public class LineageUtils { + + public static Optional nameOf(JdbcQueryStatement jdbcQueryStatement) { + if (!(jdbcQueryStatement instanceof SimpleJdbcQueryStatement)) { + return Optional.empty(); + } + + SimpleJdbcQueryStatement simpleJdbcQueryStatement = + (SimpleJdbcQueryStatement) jdbcQueryStatement; + return nameOf(simpleJdbcQueryStatement.query()); + } + + public static Optional nameOf(String query) { + return OpenLineageSql.parse(Arrays.asList(query)) + .map( + sqlMeta -> + sqlMeta.inTables().isEmpty() + ? "" + : sqlMeta.inTables().get(0).qualifiedName()); + } + + public static String namespaceOf(JdbcConnectionProvider jdbcConnectionProvider) { + if (!(jdbcConnectionProvider instanceof SimpleJdbcConnectionProvider)) { + return ""; + } + + SimpleJdbcConnectionProvider simpleJdbcConnectionProvider = + (SimpleJdbcConnectionProvider) jdbcConnectionProvider; + + return JdbcUtils.getJdbcNamespace( + simpleJdbcConnectionProvider.getDbURL(), + simpleJdbcConnectionProvider.getProperties()); + } + + public static LineageDataset datasetOf( + String name, String namespace, List facets) { + + return new LineageDataset() { + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + Map facetMap = new HashMap<>(); + facetMap.putAll( + facets.stream() + .collect( + Collectors.toMap(LineageDatasetFacet::name, item -> item))); + return facetMap; + } + }; + } + + public static LineageVertex lineageVertexOf(Collection datasets) { + return new LineageVertex() { + @Override + public List datasets() { + return datasets.stream().collect(Collectors.toList()); + } + }; + } + + public static SourceLineageVertex sourceLineageVertexOf( + Boundedness boundedness, Collection datasets) { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List datasets() { + return datasets.stream().collect(Collectors.toList()); + } + }; + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/OverrideJdbcLocationExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/OverrideJdbcLocationExtractor.java new file mode 100644 index 00000000..3b2b7898 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/OverrideJdbcLocationExtractor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.net.URISyntaxException; +import java.util.Locale; +import java.util.Optional; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Implementation of {@link JdbcLocationExtractor} that can override scheme. */ +@PublicEvolving +public class OverrideJdbcLocationExtractor extends DefaultJdbcExtractor { + + private final String overrideScheme; + private final String defaultPort; + private static final Pattern HOST_PORT_FORMAT = + Pattern.compile("^(?[\\[\\]\\w\\d.-]+):(?\\d+)?"); + + public OverrideJdbcLocationExtractor(String overrideScheme) { + this(overrideScheme, null); + } + + public OverrideJdbcLocationExtractor(String overrideScheme, String defaultPort) { + this.overrideScheme = overrideScheme; + this.defaultPort = defaultPort; + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return jdbcUri.toLowerCase(Locale.ROOT).startsWith(overrideScheme); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + JdbcLocation result = super.extract(rawUri, properties); + + Optional authority = result.getAuthority(); + if (authority.isPresent() && defaultPort != null) { + authority = authority.map(this::appendDefaultPort); + } + + return JdbcLocation.builder() + .withScheme(overrideScheme) + .withAuthority(authority) + .withInstance(result.getInstance()) + .withDatabase(result.getDatabase()) + .build(); + } + + private String appendDefaultPort(String authority) { + String[] hosts = authority.split(","); + for (int i = 0; i < hosts.length; i++) { + String host = hosts[i]; + Matcher hostPortMatcher = HOST_PORT_FORMAT.matcher(host); + if (!hostPortMatcher.matches()) { + hosts[i] = host + ":" + defaultPort; + } + } + return String.join(",", hosts); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java new file mode 100644 index 00000000..e65f2398 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +/** Facet definition to contain type information of source and sink. */ +@PublicEvolving +public interface TypeDatasetFacet extends LineageDatasetFacet { + + TypeInformation getTypeInformation(); +} diff --git a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractor.java b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractor.java new file mode 100644 index 00000000..a74c1991 --- /dev/null +++ b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.db2.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for DB2. + * + * @see DB2 URL Format + */ +@Internal +public class DB2LocationExtractor implements JdbcLocationExtractor { + + private JdbcLocationExtractor delegate() { + return new OverrideJdbcLocationExtractor("db2"); + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return delegate().isDefinedAt(jdbcUri); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + return delegate().extract(rawUri, properties); + } +} diff --git a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractorFactory.java b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractorFactory.java new file mode 100644 index 00000000..7eec8b1c --- /dev/null +++ b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/DB2LocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.db2.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for DB2. */ +@Internal +public class DB2LocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new DB2LocationExtractor(); + } +} diff --git a/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..6ace0766 --- /dev/null +++ b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.db2.database.lineage.DB2LocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractor.java b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractor.java new file mode 100644 index 00000000..a131c455 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for Mysql. + * + * @see Mysql + * URL Format + */ +@Internal +public class MySqlLocationExtractor implements JdbcLocationExtractor { + + private static final String PROTOCOL_PART = "^[\\w+:]+://"; + + private JdbcLocationExtractor delegate() { + return new OverrideJdbcLocationExtractor("mysql", "3306"); + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return delegate().isDefinedAt(jdbcUri); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + // Schema could be 'mysql', 'mysql:part', 'mysql+srv:part'. Convert it to 'mysql' + String normalizedUri = rawUri.replaceFirst(PROTOCOL_PART, "mysql://"); + return delegate().extract(normalizedUri, properties); + } +} diff --git a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MysqlLocationExtractorFactory.java b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MysqlLocationExtractorFactory.java new file mode 100644 index 00000000..ab2d35e3 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MysqlLocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Mysql. */ +@Internal +public class MysqlLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new MySqlLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..9aa28c28 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.mysql.database.lineage.MysqlLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractor.java b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractor.java new file mode 100644 index 00000000..5ce850ac --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for OceanBase. + * + * @see OceanBase + * URL Format + */ +@Internal +public class OceanBaseLocationExtractor implements JdbcLocationExtractor { + + private static final String PROTOCOL_PART = "^[\\w+:]+://"; + + private JdbcLocationExtractor delegate() { + return new OverrideJdbcLocationExtractor("oceanbase"); + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return delegate().isDefinedAt(jdbcUri); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + // Schema could be 'oceanbase', 'oceanbase:hamode'. Convert it to 'mysql' + String normalizedUri = rawUri.replaceFirst(PROTOCOL_PART, "oceanbase://"); + return delegate().extract(normalizedUri, properties); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java new file mode 100644 index 00000000..42af1b9e --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Mysql. */ +@Internal +public class OceanBaseLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new OceanBaseLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..678ddb65 --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.oceanbase.database.lineage.OceanBaseLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractor.java b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractor.java new file mode 100644 index 00000000..65258fd8 --- /dev/null +++ b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oracle.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import org.apache.commons.lang3.StringUtils; + +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * Implementation of {@link JdbcLocationExtractor} for Oracle. + * + * @see Oracle + * URL Format + */ +@Internal +public class OracleLocationExtractor implements JdbcLocationExtractor { + + private static final String SCHEME = "oracle"; + private static final String DEFAULT_PORT = "1521"; + private static final String URI_START = "^.*@(//)?"; + private static final String URI_END = "\\?.*$"; + private static final String PROTOCOL_PART = "^\\w+://"; + + @Override + public boolean isDefinedAt(String jdbcUri) { + return jdbcUri.toLowerCase(Locale.ROOT).startsWith(SCHEME); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + // oracle:thin:@//host:1521:sid?... -> host:1521:sid + String uri = rawUri.replaceFirst(URI_START, "").replaceAll(URI_END, ""); + + if (uri.contains("(")) { + throw new URISyntaxException(uri, "TNS format is unsupported for now"); + } + return extractUri(uri, properties); + } + + private JdbcLocation extractUri(String uri, Properties properties) throws URISyntaxException { + // convert 'tcp://'' protocol to 'oracle://'', convert ':sid' format to '/sid' + String normalizedUri = uri.replaceFirst(PROTOCOL_PART, ""); + normalizedUri = SCHEME + "://" + fixSidFormat(normalizedUri); + + return new OverrideJdbcLocationExtractor(SCHEME, DEFAULT_PORT) + .extract(normalizedUri, properties); + } + + private String fixSidFormat(String uri) { + if (!uri.contains(":")) { + return uri; + } + List components = Arrays.stream(uri.split(":")).collect(Collectors.toList()); + String last = components.remove(components.size() - 1); + if (last.contains("]") || last.matches("^\\d+$")) { + // '[ip:v:6]' or 'host:1521' + return uri; + } + // 'host:1521:sid' -> 'host:1521/sid' + return StringUtils.join(components, ":") + "/" + last; + } +} diff --git a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java new file mode 100644 index 00000000..1abe5431 --- /dev/null +++ b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oracle.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Oracle. */ +@Internal +public class OracleLocationExtractorFactory implements JdbcLocationExtractorFactory { + @Override + public JdbcLocationExtractor createExtractor() { + return new OracleLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..7428410d --- /dev/null +++ b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.oracle.database.lineage.OracleLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractor.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractor.java new file mode 100644 index 00000000..e2672cd5 --- /dev/null +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.postgres.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for Postgres. + * + * @see Postgres + * URL Format + */ +@Internal +public class PostgresLocationExtractor implements JdbcLocationExtractor { + + private JdbcLocationExtractor delegate() { + return new OverrideJdbcLocationExtractor("postgres", "5432"); + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return delegate().isDefinedAt(jdbcUri); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + return delegate().extract(rawUri, properties); + } +} diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java new file mode 100644 index 00000000..cefca23e --- /dev/null +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.postgres.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Postgres. */ +@Internal +public class PostgresLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new PostgresLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..0ea5acd7 --- /dev/null +++ b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.postgres.database.lineage.PostgresLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractor.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractor.java new file mode 100644 index 00000000..94f653d1 --- /dev/null +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractor.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.sqlserver.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; + +import org.apache.commons.lang3.StringUtils; + +import java.net.URISyntaxException; +import java.util.Locale; +import java.util.Optional; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Implementation of {@link JdbcLocationExtractor} for SqlServer. + * + * @see SqlServer + * URL Format + */ +@Internal +public class SqlServerLocationExtractor implements JdbcLocationExtractor { + private static final String SCHEME = "sqlserver"; + private static final String SERVICE_PROPERTY = "servername"; + private static final String PORT_PROPERTY = "portnumber"; + private static final String INSTANCE_PROPERTY = "instancename"; + private static final String DATABASE_NAME_PROPERTY = "databasename"; + private static final String DATABASE_PROPERTY = "database"; + + private static final Pattern URL = + Pattern.compile( + "(?:\\w+)://(?[\\w\\d\\.-]+)?(?:\\\\)?(?[\\w]+)?(?::)?(?\\d+)?(?.*)"); + + @Override + public boolean isDefinedAt(String jdbcUri) { + return jdbcUri.toLowerCase(Locale.ROOT).startsWith(SCHEME); + } + + @Override + @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + Matcher matcher = URL.matcher(rawUri); + if (!matcher.matches()) { + throw new URISyntaxException(rawUri, "Failed to parse jdbc url"); + } + + // Priority: url components > url params > properties + Properties finalProperties = new Properties(); + if (matcher.group("host") != null) { + finalProperties.setProperty(SERVICE_PROPERTY, matcher.group("host")); + } + + if (matcher.group("port") != null) { + finalProperties.setProperty(PORT_PROPERTY, matcher.group("port")); + } + + if (matcher.group("instance") != null) { + finalProperties.setProperty(INSTANCE_PROPERTY, matcher.group("instance")); + } + + String[] urlParams = + StringUtils.defaultString(matcher.group("params")).replaceFirst(";", "").split(";"); + + for (String urlParam : urlParams) { + String[] parts = urlParam.split("="); + if (parts.length == 2) { + // property names are case-insensitive + String key = parts[0].toLowerCase(Locale.ROOT); + String value = parts[1]; + finalProperties.setProperty(key, value); + } + } + + for (String key : properties.stringPropertyNames()) { + // properties have higher priority than in-url params. + // property names are case-insensitive + // https://learn.microsoft.com/en-us/sql/connect/jdbc/setting-the-connection-properties?view=sql-server-ver16#remarks + String normalizedKey = key.toLowerCase(Locale.ROOT); + if (finalProperties.getProperty(normalizedKey) == null) { + finalProperties.setProperty(normalizedKey, properties.getProperty(key)); + } + } + + String host = finalProperties.getProperty(SERVICE_PROPERTY); + if (host == null) { + throw new URISyntaxException(rawUri, "Missing host"); + } + if (host.contains(":") && !host.startsWith("[")) { + // IPv6 address + host = "[" + host + "]"; + } + + String port = finalProperties.getProperty(PORT_PROPERTY); + String authority; + if (port != null) { + authority = host + ":" + port; + } else { + authority = host; + } + + Optional instance = + Optional.ofNullable(finalProperties.getProperty(INSTANCE_PROPERTY)); + Optional database = + Optional.ofNullable(finalProperties.getProperty(DATABASE_NAME_PROPERTY)) + .map(Optional::of) + .orElseGet( + () -> + Optional.ofNullable( + finalProperties.getProperty(DATABASE_PROPERTY))); + + return JdbcLocation.builder() + .withScheme(SCHEME) + .withAuthority(Optional.of(authority)) + .withInstance(instance) + .withDatabase(database) + .build(); + } +} diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java new file mode 100644 index 00000000..ccb0b372 --- /dev/null +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.sqlserver.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for SqlServer. */ +@Internal +public class SqlServerLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new SqlServerLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..2598cebb --- /dev/null +++ b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.sqlserver.database.lineage.SqlServerLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractor.java b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractor.java new file mode 100644 index 00000000..f903a7c0 --- /dev/null +++ b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.trino.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for Trino. + * + * @see Trino URL Format + */ +@Internal +public class TrinoLocationExtractor implements JdbcLocationExtractor { + + private JdbcLocationExtractor delegate() { + return new OverrideJdbcLocationExtractor("postgres"); + } + + @Override + public boolean isDefinedAt(String jdbcUri) { + return delegate().isDefinedAt(jdbcUri); + } + + @Override + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + return delegate().extract(rawUri, properties); + } +} diff --git a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java new file mode 100644 index 00000000..c66f1378 --- /dev/null +++ b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.trino.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Postgres. */ +@Internal +public class TrinoLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcLocationExtractor createExtractor() { + return new TrinoLocationExtractor(); + } +} diff --git a/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 00000000..8be9bbb7 --- /dev/null +++ b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.trino.database.lineage.TrinoLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java index b88eddb5..bee67d69 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -129,7 +131,8 @@ @Internal @Deprecated public class JdbcXaSinkFunction extends AbstractRichFunction - implements CheckpointedFunction, + implements LineageVertexProvider, + CheckpointedFunction, CheckpointListener, SinkFunction, AutoCloseable, @@ -370,4 +373,9 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi JdbcOutputSerializer.of( ((TypeInformation) type).createSerializer(executionConfig)); } + + @Override + public LineageVertex getLineageVertex() { + return null; + } } diff --git a/pom.xml b/pom.xml index f688e0c8..640b2eb6 100644 --- a/pom.xml +++ b/pom.xml @@ -288,6 +288,12 @@ under the License. ${log4j.version} + + io.openlineage + openlineage-sql-java + 1.25.0 + + com.fasterxml.jackson