Skip to content

Commit

Permalink
[FLINK-36659] Bump to 2.0-preview1 and remove deprecated flink-connec…
Browse files Browse the repository at this point in the history
…tor-jdbc module.
  • Loading branch information
lvyanquan committed Jan 27, 2025
1 parent 15c1a78 commit d19c67b
Show file tree
Hide file tree
Showing 71 changed files with 244 additions and 5,931 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/backwards_compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
jdk: [8, 11, 17]
flink: [2.0-preview1]
jdk: [11, 17, 21]

env:
MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [1.20.0]
jdk: [ '8, 11, 17, 21' ]
flink: [2.0-preview1]
jdk: [ '11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
9 changes: 2 additions & 7 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
},
{
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
flink: 2.0-preview1,
jdk: '11, 17, 21',
branch: main
}, {
flink: 1.19.1,
Expand Down
6 changes: 0 additions & 6 deletions flink-connector-jdbc-architecture/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
</dependency>

<!-- Flink Jdbc Modules To Test -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc-core</artifactId>
Expand Down
18 changes: 16 additions & 2 deletions flink-connector-jdbc-backward-compatibility/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<packaging>jar</packaging>

<properties>
<postgres.version>42.7.3</postgres.version>
<surefire.module.config>
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
Expand Down Expand Up @@ -58,17 +59,30 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<artifactId>flink-connector-jdbc-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc-postgres</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<artifactId>flink-connector-jdbc-postgres</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.flink.connector.jdbc.backward.compatibility;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
Expand Down Expand Up @@ -71,8 +72,10 @@ public List<TableManaged> getManagedTables() {

@Test
public void testAtLeastOnce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);

assertResult(new ArrayList<>());
Expand All @@ -97,8 +100,10 @@ public void testAtLeastOnce() throws Exception {

@Test
public void testExactlyOnce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);

assertResult(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
package org.apache.flink.connector.jdbc.backward.compatibility;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
import org.apache.flink.connector.jdbc.source.JdbcSource;
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -100,8 +101,10 @@ void init() throws SQLException {

@Test
void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -121,8 +124,10 @@ void testReadWithoutParallelismWithoutParamsProvider() throws Exception {

@Test
void testReadWithoutParallelismWithParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -145,8 +150,10 @@ void testReadWithoutParallelismWithParamsProvider() throws Exception {

@Test
void testReadWithParallelismWithoutParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(2);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -166,8 +173,10 @@ void testReadWithParallelismWithoutParamsProvider() throws Exception {

@Test
void testReadWithParallelismWithParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(2);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
Expand All @@ -47,7 +50,9 @@
*/
@PublicEvolving
public class JdbcSink<IN>
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
implements Sink<IN>,
SupportsWriterState<IN, JdbcWriterState>,
SupportsCommitter<JdbcCommitable> {

private final DeliveryGuarantee deliveryGuarantee;
private final JdbcConnectionProvider connectionProvider;
Expand All @@ -74,14 +79,26 @@ public static <IN> JdbcSinkBuilder<IN> builder() {

@Override
@Internal
public JdbcWriter<IN> createWriter(InitContext context) throws IOException {
public JdbcWriter<IN> createWriter(WriterInitContext context) throws IOException {
return restoreWriter(context, Collections.emptyList());
}

@Override
public Committer<JdbcCommitable> createCommitter(CommitterInitContext committerInitContext)
throws IOException {
return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions);
}

@Override
@Internal
public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
return new JdbcCommitableSerializer();
}

@Override
public JdbcWriter<IN> restoreWriter(
InitContext context, Collection<JdbcWriterState> recoveredState) throws IOException {
WriterInitContext context, Collection<JdbcWriterState> recoveredState)
throws IOException {
JdbcOutputSerializer<IN> outputSerializer =
JdbcOutputSerializer.of(
context.createInputSerializer(), context.isObjectReuseEnabled());
Expand All @@ -96,18 +113,6 @@ public JdbcWriter<IN> restoreWriter(
context);
}

@Override
@Internal
public Committer<JdbcCommitable> createCommitter() throws IOException {
return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions);
}

@Override
@Internal
public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
return new JdbcCommitableSerializer();
}

@Override
@Internal
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.flink.connector.jdbc.core.datastream.sink.writer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
Expand Down Expand Up @@ -55,8 +55,8 @@
*/
@Internal
public class JdbcWriter<IN>
implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommitable> {
implements StatefulSinkWriter<IN, JdbcWriterState>,
CommittingSinkWriter<IN, JdbcCommitable> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcWriter.class);

Expand All @@ -75,7 +75,7 @@ public JdbcWriter(
JdbcOutputSerializer<IN> outputSerializer,
DeliveryGuarantee deliveryGuarantee,
Collection<JdbcWriterState> recoveredState,
Sink.InitContext initContext)
InitContext initContext)
throws IOException {

this.deliveryGuarantee =
Expand All @@ -85,9 +85,7 @@ public JdbcWriter(

pendingRecords = false;
this.lastCheckpointId =
initContext
.getRestoredCheckpointId()
.orElse(Sink.InitContext.INITIAL_CHECKPOINT_ID - 1);
initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID - 1);

checkNotNull(connectionProvider, "connectionProvider must be defined");

Expand All @@ -106,9 +104,9 @@ public JdbcWriter(

TransactionId transactionId =
TransactionId.create(
initContext.getJobId().getBytes(),
initContext.getSubtaskId(),
initContext.getNumberOfParallelSubtasks());
initContext.getJobInfo().getJobId().getBytes(),
initContext.getTaskInfo().getIndexOfThisSubtask(),
initContext.getTaskInfo().getNumberOfParallelSubtasks());

this.jdbcTransaction =
new XaTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ public JdbcSourceSplitReader(
this.config = Preconditions.checkNotNull(config);
this.typeInformation = Preconditions.checkNotNull(typeInformation);
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.resultSetType = config.getInteger(RESULTSET_TYPE);
this.resultSetConcurrency = config.getInteger(RESULTSET_CONCURRENCY);
this.resultSetFetchSize = config.getInteger(RESULTSET_FETCH_SIZE);
this.autoCommit = config.getBoolean(AUTO_COMMIT);
this.resultSetType = config.get(RESULTSET_TYPE);
this.resultSetConcurrency = config.get(RESULTSET_CONCURRENCY);
this.resultSetFetchSize = config.get(RESULTSET_FETCH_SIZE);
this.autoCommit = config.get(AUTO_COMMIT);
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
this.splits = new ArrayDeque<>();
this.hasNextRecordCurrentSplit = false;
this.currentSplit = null;
int splitReaderFetchBatchSize = config.getInteger(READER_FETCH_BATCH_SIZE);
int splitReaderFetchBatchSize = config.get(READER_FETCH_BATCH_SIZE);
Preconditions.checkArgument(
splitReaderFetchBatchSize > 0 && splitReaderFetchBatchSize < Integer.MAX_VALUE);
this.splitReaderFetchBatchSize = splitReaderFetchBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

Expand Down
Loading

0 comments on commit d19c67b

Please sign in to comment.