Skip to content

Commit

Permalink
[FLINK-34467] add lineage integration for jdbc connector
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu committed Dec 4, 2024
1 parent 134d858 commit 231ad7d
Show file tree
Hide file tree
Showing 46 changed files with 1,664 additions and 122 deletions.
103 changes: 0 additions & 103 deletions .github/workflows/backwards_compatibility.yml

This file was deleted.

6 changes: 1 addition & 5 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
flink_branches: [{
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
},
flink_branches: [
{
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
Expand Down
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
11
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.getPrimaryKey(org.apache.flink.table.data.RowData, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (JdbcOutputFormatBuilder.java:0)
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.setFieldDataTypes([Lorg.apache.flink.table.types.DataType;)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (JdbcOutputFormatBuilder.java:0)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.createInputSplits(int)> has return type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:287)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:295)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> has parameter of type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getDbConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcRowDataLookupFunction.java:0)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(org.apache.flink.table.types.DataType)> in (JdbcRowDataLookupFunction.java:242)
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType(org.apache.flink.table.types.logical.LogicalType)> in (JdbcRowDataLookupFunction.java:243)
Method <org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider.from(javax.sql.XADataSource)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SimpleXaConnectionProvider.java:0)
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (TransactionId.java:96)
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in (TransactionId.java:101)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
5 changes: 5 additions & 0 deletions flink-connector-jdbc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-sql-java</artifactId>
</dependency>

<!-- Tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand All @@ -53,6 +59,8 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

/**
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
Expand Down Expand Up @@ -107,7 +115,7 @@
@Deprecated
@Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable<Row> {
implements LineageVertexProvider, ResultTypeQueryable<Row> {

protected static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
Expand Down Expand Up @@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
return new JdbcInputFormatBuilder();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getProducedType());
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}

/** Builder for {@link JdbcInputFormat}. */
public static class JdbcInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

/**
* Flink Sink to produce data into a jdbc database.
Expand All @@ -47,7 +52,9 @@
*/
@PublicEvolving
public class JdbcSink<IN>
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
implements LineageVertexProvider,
StatefulSink<IN, JdbcWriterState>,
TwoPhaseCommittingSink<IN, JdbcCommitable> {

private final DeliveryGuarantee deliveryGuarantee;
private final JdbcConnectionProvider connectionProvider;
Expand Down Expand Up @@ -113,4 +120,13 @@ public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
return new JdbcWriterStateSerializer();
}

@Override
public LineageVertex getLineageVertex() {
Optional<String> nameOpt = LineageUtils.nameOf(queryStatement.query());
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,36 @@
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;

/** JDBC source. */
@PublicEvolving
public class JdbcSource<OUT>
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
implements LineageVertexProvider,
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
ResultTypeQueryable<OUT> {

private final Boundedness boundedness;
Expand Down Expand Up @@ -195,4 +205,18 @@ public boolean equals(Object o) {
&& deliveryGuarantee == that.deliveryGuarantee
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getTypeInformation());
SqlTemplateSplitEnumerator enumerator =
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
Optional<String> nameOpt = LineageUtils.nameOf(enumerator.getSqlTemplate());
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
}
}
Loading

0 comments on commit 231ad7d

Please sign in to comment.