diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 9923226d121..359baaed186 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -336,6 +336,7 @@ public Set> optionalOptions() { options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); + options.add(PARSE_ONLINE_SCHEMA_CHANGES); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index de49c7b7e7a..bc1a7c9bab3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; @@ -258,14 +259,16 @@ public void testOptionalOption() { // optional option options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false"); + options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true"); Factory.Context context = new MockContext(Configuration.fromMap(options)); MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); - assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED)) - .isEqualTo(true); + assertThat(factory.optionalOptions()) + .contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES); MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); - assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false); + assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse(); + assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue(); } @Test diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 3c5d6d63b41..078960266f5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -357,7 +357,9 @@ public boolean equals(Object o) { && Objects.equals(jdbcProperties, that.jdbcProperties) && Objects.equals(heartbeatInterval, that.heartbeatInterval) && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) - && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill); + && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill) + && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges + && useLegacyJsonFormat == that.useLegacyJsonFormat; } @Override @@ -390,7 +392,9 @@ public int hashCode() { jdbcProperties, heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + parseOnlineSchemaChanges, + useLegacyJsonFormat); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 6890ca78fd7..2c29bf2306d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -805,6 +805,47 @@ public void testValidation() { } } + @Test + public void testEnablingExperimentalOptions() { + Map properties = getAllOptions(); + properties.put("scan.parse.online.schema.changes.enabled", "true"); + properties.put("use.legacy.json.format", "true"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySqlTableSource expectedSource = + new MySqlTableSource( + SCHEMA, + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.systemDefault(), + PROPERTIES, + null, + false, + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(), + CHUNK_META_GROUP_SIZE.defaultValue(), + SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(), + CONNECT_TIMEOUT.defaultValue(), + CONNECT_MAX_RETRIES.defaultValue(), + CONNECTION_POOL_SIZE.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + StartupOptions.initial(), + false, + false, + new Properties(), + HEARTBEAT_INTERVAL.defaultValue(), + null, + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + true, + true); + assertEquals(expectedSource, actualSource); + } + private Map getAllOptions() { Map options = new HashMap<>(); options.put("connector", "mysql-cdc");