Skip to content

Commit

Permalink
[FLINK-36659] Bump to 2.0-preview1.
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan committed Jan 27, 2025
1 parent 8e97244 commit d026274
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.flink.connector.jdbc.backward.compatibility;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
Expand Down Expand Up @@ -71,8 +72,10 @@ public List<TableManaged> getManagedTables() {

@Test
public void testAtLeastOnce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);

assertResult(new ArrayList<>());
Expand All @@ -97,8 +100,10 @@ public void testAtLeastOnce() throws Exception {

@Test
public void testExactlyOnce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);

assertResult(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.flink.connector.jdbc.backward.compatibility;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
import org.apache.flink.connector.jdbc.source.JdbcSource;
Expand All @@ -30,7 +31,7 @@
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -100,8 +101,10 @@ void init() throws SQLException {

@Test
void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -121,8 +124,10 @@ void testReadWithoutParallelismWithoutParamsProvider() throws Exception {

@Test
void testReadWithoutParallelismWithParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -145,8 +150,10 @@ void testReadWithoutParallelismWithParamsProvider() throws Exception {

@Test
void testReadWithParallelismWithoutParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(2);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand All @@ -166,8 +173,10 @@ void testReadWithParallelismWithoutParamsProvider() throws Exception {

@Test
void testReadWithParallelismWithParamsProvider() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(2);
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
JdbcSource.<JdbcTestFixture.TestEntry>builder()
Expand Down

0 comments on commit d026274

Please sign in to comment.