diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 5b4024e30c18..14f8959478ce 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -86,6 +86,8 @@
"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.
"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.
+ "decimal-no-change": Ignore decimal type change.
+ "no-change": Ignore any type change.
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 122e5eb12177..54d2054770c3 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -58,6 +58,8 @@
"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.
"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.
+ "decimal-no-change": Ignore decimal type change.
+ "no-change": Ignore any type change.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index d6d85e59bba8..d876fe484b50 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -231,6 +231,7 @@ protected void buildSink(
.withInput(input)
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
+ .withTypeMapping(typeMapping)
.withDatabase(database)
.withTables(tables)
.withMode(mode)
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index ae4f5346b24c..6396f8139562 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -168,7 +168,8 @@ protected void buildSink(
.withParserFactory(parserFactory)
.withTable(fileStoreTable)
.withIdentifier(new Identifier(database, table))
- .withCatalogLoader(catalogLoader());
+ .withCatalogLoader(catalogLoader())
+ .withTypeMapping(typeMapping);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
index 741ed5a35602..3262197aaec8 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -76,7 +76,9 @@ public enum TypeMappingMode {
TO_STRING,
CHAR_TO_STRING,
LONGTEXT_TO_BYTES,
- BIGINT_UNSIGNED_TO_BIGINT;
+ BIGINT_UNSIGNED_TO_BIGINT,
+ DECIMAL_NO_CHANGE,
+ NO_CHANGE;
private static final Map TYPE_MAPPING_OPTIONS =
Arrays.stream(TypeMappingMode.values())
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 5c27db6ddf1b..c7d70cda19cd 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -49,6 +50,7 @@ public class CdcSinkBuilder {
private Table table = null;
private Identifier identifier = null;
private CatalogLoader catalogLoader = null;
+ private TypeMapping typeMapping = null;
@Nullable private Integer parallelism;
@@ -82,6 +84,11 @@ public CdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
return this;
}
+ public CdcSinkBuilder withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
+ }
+
public DataStreamSink> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
@@ -109,7 +116,8 @@ public DataStreamSink> build() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(dataTable.fileIO(), dataTable.location()),
identifier,
- catalogLoader))
+ catalogLoader,
+ typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index bd18c7e7ad82..af4a67c33af3 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
@@ -73,6 +74,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder {
// Paimon tables. 2) in multiplex sink where it is used to
// initialize different writers to multiple tables.
private CatalogLoader catalogLoader;
+ private TypeMapping typeMapping;
+
// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
@@ -116,6 +119,11 @@ public FlinkCdcSyncDatabaseSinkBuilder withCatalogLoader(CatalogLoader catalo
return this;
}
+ public FlinkCdcSyncDatabaseSinkBuilder withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
+ }
+
public FlinkCdcSyncDatabaseSinkBuilder withMode(MultiTablesSinkMode mode) {
this.mode = mode;
return this;
@@ -153,7 +161,7 @@ private void buildCombinedCdcSink() {
parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
.keyBy(t -> t.f0)
- .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
+ .process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader, typeMapping))
.name("Schema Evolution");
DataStream converted =
@@ -201,7 +209,8 @@ private void buildDividedCdcSink() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
Identifier.create(database, table.name()),
- catalogLoader))
+ catalogLoader,
+ typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index dd612a52c2eb..71f71b241224 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
@@ -52,8 +53,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction
private final Map schemaManagers = new HashMap<>();
- public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
- super(catalogLoader);
+ public MultiTableUpdatedDataFieldsProcessFunction(
+ CatalogLoader catalogLoader, TypeMapping typeMapping) {
+ super(catalogLoader, typeMapping);
}
@Override
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index e143aabf6c13..93e22f1e62b6 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -20,6 +20,7 @@
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
@@ -53,8 +54,11 @@ public class UpdatedDataFieldsProcessFunction
private Set latestFields;
public UpdatedDataFieldsProcessFunction(
- SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
- super(catalogLoader);
+ SchemaManager schemaManager,
+ Identifier identifier,
+ CatalogLoader catalogLoader,
+ TypeMapping typeMapping) {
+ super(catalogLoader, typeMapping);
this.schemaManager = schemaManager;
this.identifier = identifier;
this.latestFields = new HashSet<>();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 90edbc034a54..dcf0ac2f8e70 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -52,6 +53,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process
protected final CatalogLoader catalogLoader;
protected Catalog catalog;
private boolean caseSensitive;
+ private TypeMapping typeMapping;
private static final List STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -71,8 +73,10 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process
private static final List TIMESTAMP_TYPES =
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
- protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) {
+ protected UpdatedDataFieldsProcessFunctionBase(
+ CatalogLoader catalogLoader, TypeMapping typeMapping) {
this.catalogLoader = catalogLoader;
+ this.typeMapping = typeMapping;
}
/**
@@ -214,6 +218,15 @@ protected List extractSchemaChanges(
oldFields.put(oldField.name(), oldField);
}
+ boolean allowTypeChange =
+ this.typeMapping == null
+ || !this.typeMapping.containsMode(TypeMapping.TypeMappingMode.NO_CHANGE);
+
+ boolean allowDecimalTypeChange =
+ this.typeMapping == null
+ || !this.typeMapping.containsMode(
+ TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);
+
List result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
@@ -231,7 +244,10 @@ protected List extractSchemaChanges(
SchemaChange.updateColumnComment(
new String[] {newFieldName}, newField.description()));
}
- } else {
+ } else if (allowTypeChange) {
+ if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) {
+ continue;
+ }
// update column type
result.add(SchemaChange.updateColumnType(newFieldName, newField.type()));
// update column comment
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 46c8e98fb639..8d071150e07d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -210,7 +210,8 @@ public void testSchemaEvolution() throws Exception {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
identifier,
- catalogLoader))
+ catalogLoader,
+ TypeMapping.defaultMapping()))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index ed1885f5d774..664e8981c1a3 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -49,6 +49,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE;
+import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.NO_CHANGE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
@@ -1148,4 +1150,71 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval
public void testWaterMarkSyncTable() throws Exception {
testWaterMarkSyncTable(CANAL);
}
+
+ @Test
+ @Timeout(60)
+ public void testVarcharNoChange() throws Exception {
+ String topic = "varchar-no-change";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-1.txt");
+ Map kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTypeMappingModes(NO_CHANGE.configString())
+ .build();
+ runActionWithDefaultEnv(action);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+ new String[] {"k", "v"});
+ waitForResult(
+ Collections.singletonList("+I[1, one]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.singletonList("k"));
+ writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-2.txt");
+ waitForResult(
+ Arrays.asList("+I[1, one]", "+I[2, two]"),
+ getFileStoreTable(tableName),
+ rowType, // should not change
+ Collections.singletonList("k"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testDecimalNoChange() throws Exception {
+ String topic = "decimal-no-change";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-3.txt");
+ Map kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withCatalogConfig(
+ Collections.singletonMap(
+ CatalogOptions.CASE_SENSITIVE.key(), "false"))
+ .withTypeMappingModes(DECIMAL_NO_CHANGE.configString())
+ .build();
+ runActionWithDefaultEnv(action);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(), DataTypes.DECIMAL(10, 2)},
+ new String[] {"k", "v"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1.20]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.singletonList("k"));
+ writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-4.txt");
+ waitForResult(
+ Arrays.asList("+I[1, 1.20]", "+I[2, 2.30]"),
+ getFileStoreTable(tableName),
+ rowType, // should not change
+ Collections.singletonList("k"));
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 35286e3a88d4..56c7e14ee305 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -23,6 +23,7 @@
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -171,6 +172,7 @@ private void innerTestRandomCdcEvents(Supplier bucket, boolean unawareB
// because we have at most 3 tables and 8 slots in AbstractTestBase
// each table can only get 2 slots
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
+ .withTypeMapping(TypeMapping.defaultMapping())
.withDatabase(DATABASE_NAME)
.withCatalogLoader(catalogLoader)
.build();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt
new file mode 100644
index 000000000000..69285689e3bf
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":1,"v":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006706728,"type":"INSERT"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt
new file mode 100644
index 000000000000..d56b39e4fc3c
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-2.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":2,"v":"two"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(20)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006724404,"type":"INSERT"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt
new file mode 100644
index 000000000000..f9380168b051
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-3.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v":"1.2"}],"database":"paimon_sync_table","es":1683880554000,"id":2150,"isDdl":false,"mysqlType":{"k":"int","v":"decimal(10,2)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":3},"table":"decimal_no_change","ts":1683880554351,"type":"INSERT"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt
new file mode 100644
index 000000000000..280c5cd70570
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/typenochange/canal-data-4.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"2","v":"2.3"}],"database":"paimon_sync_table","es":1683880554000,"id":2150,"isDdl":false,"mysqlType":{"k":"int","v":"decimal(15,4)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":3},"table":"decimal_no_change","ts":1683880554351,"type":"INSERT"}
\ No newline at end of file