From b5cb6eb3b0389565bcdd3fff3ef53577f9ff8e64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?= Date: Sun, 12 Jan 2025 13:47:41 +0800 Subject: [PATCH] Add new type_mapping options to ignore some type changes when spcified --- .../generated/kafka_sync_database.html | 2 + .../generated/kafka_sync_table.html | 2 + .../action/cdc/SyncDatabaseActionBase.java | 1 + .../flink/action/cdc/SyncTableActionBase.java | 3 +- .../paimon/flink/action/cdc/TypeMapping.java | 4 +- .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 10 ++- .../cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 13 +++- ...TableUpdatedDataFieldsProcessFunction.java | 6 +- .../cdc/UpdatedDataFieldsProcessFunction.java | 8 ++- .../UpdatedDataFieldsProcessFunctionBase.java | 20 +++++- .../flink/action/cdc/SchemaEvolutionTest.java | 3 +- .../KafkaCanalSyncTableActionITCase.java | 69 +++++++++++++++++++ .../cdc/FlinkCdcSyncDatabaseSinkITCase.java | 2 + .../canal/table/typenochange/canal-data-1.txt | 19 +++++ .../canal/table/typenochange/canal-data-2.txt | 19 +++++ .../canal/table/typenochange/canal-data-3.txt | 19 +++++ .../canal/table/typenochange/canal-data-4.txt | 19 +++++ 17 files changed, 207 insertions(+), 12 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 9f0b817e6647..d9b29e9eb734 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -86,6 +86,8 @@
  • "char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
  • "longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.
  • "bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.
  • +
  • "decimal-no-change": Ignore decimal type change.
  • +
  • "no-change": Ignore any type change.
  • diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html index 765aade203aa..93341a391cd2 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_table.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html @@ -58,6 +58,8 @@
  • "char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
  • "longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.
  • "bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.
  • +
  • "decimal-no-change": Ignore decimal type change.
  • +
  • "no-change": Ignore any type change.
  • diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index d6d85e59bba8..d876fe484b50 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -231,6 +231,7 @@ protected void buildSink( .withInput(input) .withParserFactory(parserFactory) .withCatalogLoader(catalogLoader()) + .withTypeMapping(typeMapping) .withDatabase(database) .withTables(tables) .withMode(mode) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index ae4f5346b24c..6396f8139562 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -168,7 +168,8 @@ protected void buildSink( .withParserFactory(parserFactory) .withTable(fileStoreTable) .withIdentifier(new Identifier(database, table)) - .withCatalogLoader(catalogLoader()); + .withCatalogLoader(catalogLoader()) + .withTypeMapping(typeMapping); String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); if (sinkParallelism != null) { sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java index 741ed5a35602..3262197aaec8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java @@ -76,7 +76,9 @@ public enum TypeMappingMode { TO_STRING, CHAR_TO_STRING, LONGTEXT_TO_BYTES, - BIGINT_UNSIGNED_TO_BIGINT; + BIGINT_UNSIGNED_TO_BIGINT, + DECIMAL_NO_CHANGE, + NO_CHANGE; private static final Map TYPE_MAPPING_OPTIONS = Arrays.stream(TypeMappingMode.values()) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 5c27db6ddf1b..c7d70cda19cd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.Experimental; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; @@ -49,6 +50,7 @@ public class CdcSinkBuilder { private Table table = null; private Identifier identifier = null; private CatalogLoader catalogLoader = null; + private TypeMapping typeMapping = null; @Nullable private Integer parallelism; @@ -82,6 +84,11 @@ public CdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) { return this; } + public CdcSinkBuilder withTypeMapping(TypeMapping typeMapping) { + this.typeMapping = typeMapping; + return this; + } + public DataStreamSink build() { Preconditions.checkNotNull(input, "Input DataStream can not be null."); Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null."); @@ -109,7 +116,8 @@ public DataStreamSink build() { new UpdatedDataFieldsProcessFunction( new SchemaManager(dataTable.fileIO(), dataTable.location()), identifier, - catalogLoader)) + catalogLoader, + typeMapping)) .name("Schema Evolution"); schemaChangeProcessFunction.getTransformation().setParallelism(1); schemaChangeProcessFunction.getTransformation().setMaxParallelism(1); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index bd18c7e7ad82..af4a67c33af3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.MultiTablesSinkMode; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.sink.FlinkWriteSink; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.options.MemorySize; @@ -73,6 +74,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder { // Paimon tables. 2) in multiplex sink where it is used to // initialize different writers to multiple tables. private CatalogLoader catalogLoader; + private TypeMapping typeMapping; + // database to sync, currently only support single database private String database; private MultiTablesSinkMode mode; @@ -116,6 +119,11 @@ public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(CatalogLoader catalo return this; } + public FlinkCdcSyncDatabaseSinkBuilder withTypeMapping(TypeMapping typeMapping) { + this.typeMapping = typeMapping; + return this; + } + public FlinkCdcSyncDatabaseSinkBuilder withMode(MultiTablesSinkMode mode) { this.mode = mode; return this; @@ -153,7 +161,7 @@ private void buildCombinedCdcSink() { parsed, CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG) .keyBy(t -> t.f0) - .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader)) + .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader, typeMapping)) .name("Schema Evolution"); DataStream converted = @@ -201,7 +209,8 @@ private void buildDividedCdcSink() { new UpdatedDataFieldsProcessFunction( new SchemaManager(table.fileIO(), table.location()), Identifier.create(database, table.name()), - catalogLoader)) + catalogLoader, + typeMapping)) .name("Schema Evolution"); schemaChangeProcessFunction.getTransformation().setParallelism(1); schemaChangeProcessFunction.getTransformation().setMaxParallelism(1); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java index dd612a52c2eb..71f71b241224 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -52,8 +53,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction private final Map schemaManagers = new HashMap<>(); - public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) { - super(catalogLoader); + public MultiTableUpdatedDataFieldsProcessFunction( + CatalogLoader catalogLoader, TypeMapping typeMapping) { + super(catalogLoader, typeMapping); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java index e143aabf6c13..93e22f1e62b6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.types.DataField; @@ -53,8 +54,11 @@ public class UpdatedDataFieldsProcessFunction private Set latestFields; public UpdatedDataFieldsProcessFunction( - SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) { - super(catalogLoader); + SchemaManager schemaManager, + Identifier identifier, + CatalogLoader catalogLoader, + TypeMapping typeMapping) { + super(catalogLoader, typeMapping); this.schemaManager = schemaManager; this.identifier = identifier; this.latestFields = new HashSet<>(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 90edbc034a54..dcf0ac2f8e70 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -52,6 +53,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process protected final CatalogLoader catalogLoader; protected Catalog catalog; private boolean caseSensitive; + private TypeMapping typeMapping; private static final List STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR); @@ -71,8 +73,10 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process private static final List TIMESTAMP_TYPES = Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); - protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) { + protected UpdatedDataFieldsProcessFunctionBase( + CatalogLoader catalogLoader, TypeMapping typeMapping) { this.catalogLoader = catalogLoader; + this.typeMapping = typeMapping; } /** @@ -214,6 +218,15 @@ protected List extractSchemaChanges( oldFields.put(oldField.name(), oldField); } + boolean allowTypeChange = + this.typeMapping == null + || !this.typeMapping.containsMode(TypeMapping.TypeMappingMode.NO_CHANGE); + + boolean allowDecimalTypeChange = + this.typeMapping == null + || !this.typeMapping.containsMode( + TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE); + List result = new ArrayList<>(); for (DataField newField : updatedDataFields) { String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive); @@ -231,7 +244,10 @@ protected List extractSchemaChanges( SchemaChange.updateColumnComment( new String[] {newFieldName}, newField.description())); } - } else { + } else if (allowTypeChange) { + if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) { + continue; + } // update column type result.add(SchemaChange.updateColumnType(newFieldName, newField.type())); // update column comment diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java index 46c8e98fb639..8d071150e07d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java @@ -210,7 +210,8 @@ public void testSchemaEvolution() throws Exception { new UpdatedDataFieldsProcessFunction( new SchemaManager(table.fileIO(), table.location()), identifier, - catalogLoader)) + catalogLoader, + TypeMapping.defaultMapping())) .name("Schema Evolution"); schemaChangeProcessFunction.getTransformation().setParallelism(1); schemaChangeProcessFunction.getTransformation().setMaxParallelism(1); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index ed1885f5d774..664e8981c1a3 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -49,6 +49,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE; +import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.NO_CHANGE; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -1148,4 +1150,71 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval public void testWaterMarkSyncTable() throws Exception { testWaterMarkSyncTable(CANAL); } + + @Test + @Timeout(60) + public void testVarcharNoChange() throws Exception { + String topic = "varchar-no-change"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .withTypeMappingModes(NO_CHANGE.configString()) + .build(); + runActionWithDefaultEnv(action); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v"}); + waitForResult( + Collections.singletonList("+I[1, one]"), + getFileStoreTable(tableName), + rowType, + Collections.singletonList("k")); + writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-2.txt"); + waitForResult( + Arrays.asList("+I[1, one]", "+I[2, two]"), + getFileStoreTable(tableName), + rowType, // should not change + Collections.singletonList("k")); + } + + @Test + @Timeout(60) + public void testDecimalNoChange() throws Exception { + String topic = "decimal-no-change"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-3.txt"); + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.CASE_SENSITIVE.key(), "false")) + .withTypeMappingModes(DECIMAL_NO_CHANGE.configString()) + .build(); + runActionWithDefaultEnv(action); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.DECIMAL(10, 2)}, + new String[] {"k", "v"}); + waitForResult( + Collections.singletonList("+I[1, 1.20]"), + getFileStoreTable(tableName), + rowType, + Collections.singletonList("k")); + writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-4.txt"); + waitForResult( + Arrays.asList("+I[1, 1.20]", "+I[2, 2.30]"), + getFileStoreTable(tableName), + rowType, // should not change + Collections.singletonList("k")); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java index 35286e3a88d4..56c7e14ee305 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -171,6 +172,7 @@ private void innerTestRandomCdcEvents(Supplier bucket, boolean unawareB // because we have at most 3 tables and 8 slots in AbstractTestBase // each table can only get 2 slots .withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2")) + .withTypeMapping(TypeMapping.defaultMapping()) .withDatabase(DATABASE_NAME) .withCatalogLoader(catalogLoader) .build(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt new file mode 100644 index 000000000000..69285689e3bf --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k":1,"v":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006706728,"type":"INSERT"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt new file mode 100644 index 000000000000..d56b39e4fc3c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k":2,"v":"two"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(20)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006724404,"type":"INSERT"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt new file mode 100644 index 000000000000..f9380168b051 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k":"1","v":"1.2"}],"database":"paimon_sync_table","es":1683880554000,"id":2150,"isDdl":false,"mysqlType":{"k":"int","v":"decimal(10,2)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":3},"table":"decimal_no_change","ts":1683880554351,"type":"INSERT"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt new file mode 100644 index 000000000000..280c5cd70570 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k":"2","v":"2.3"}],"database":"paimon_sync_table","es":1683880554000,"id":2150,"isDdl":false,"mysqlType":{"k":"int","v":"decimal(15,4)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":3},"table":"decimal_no_change","ts":1683880554351,"type":"INSERT"} \ No newline at end of file