Skip to content

Commit

Permalink
Add new type_mapping options to ignore some type changes when spcified
Browse files Browse the repository at this point in the history
  • Loading branch information
李鹏程 committed Jan 12, 2025
1 parent 4696879 commit b5cb6eb
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
<li>"decimal-no-change": Ignore decimal type change.</li>
<li>"no-change": Ignore any type change.</li>
</ul>
</td>
</tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
<li>"decimal-no-change": Ignore decimal type change.</li>
<li>"no-change": Ignore any type change.</li>
</ul>
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ protected void buildSink(
.withInput(input)
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withTypeMapping(typeMapping)
.withDatabase(database)
.withTables(tables)
.withMode(mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ protected void buildSink(
.withParserFactory(parserFactory)
.withTable(fileStoreTable)
.withIdentifier(new Identifier(database, table))
.withCatalogLoader(catalogLoader());
.withCatalogLoader(catalogLoader())
.withTypeMapping(typeMapping);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public enum TypeMappingMode {
TO_STRING,
CHAR_TO_STRING,
LONGTEXT_TO_BYTES,
BIGINT_UNSIGNED_TO_BIGINT;
BIGINT_UNSIGNED_TO_BIGINT,
DECIMAL_NO_CHANGE,
NO_CHANGE;

private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =
Arrays.stream(TypeMappingMode.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class CdcSinkBuilder<T> {
private Table table = null;
private Identifier identifier = null;
private CatalogLoader catalogLoader = null;
private TypeMapping typeMapping = null;

@Nullable private Integer parallelism;

Expand Down Expand Up @@ -82,6 +84,11 @@ public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
return this;
}

public CdcSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
}

public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
Expand Down Expand Up @@ -109,7 +116,8 @@ public DataStreamSink<?> build() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(dataTable.fileIO(), dataTable.location()),
identifier,
catalogLoader))
catalogLoader,
typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
// Paimon tables. 2) in multiplex sink where it is used to
// initialize different writers to multiple tables.
private CatalogLoader catalogLoader;
private TypeMapping typeMapping;

// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
Expand Down Expand Up @@ -116,6 +119,11 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(CatalogLoader catalo
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode mode) {
this.mode = mode;
return this;
Expand Down Expand Up @@ -153,7 +161,7 @@ private void buildCombinedCdcSink() {
parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
.keyBy(t -> t.f0)
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader, typeMapping))
.name("Schema Evolution");

DataStream<CdcMultiplexRecord> converted =
Expand Down Expand Up @@ -201,7 +209,8 @@ private void buildDividedCdcSink() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
Identifier.create(database, table.name()),
catalogLoader))
catalogLoader,
typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -52,8 +53,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction

private final Map<Identifier, SchemaManager> schemaManagers = new HashMap<>();

public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
super(catalogLoader);
public MultiTableUpdatedDataFieldsProcessFunction(
CatalogLoader catalogLoader, TypeMapping typeMapping) {
super(catalogLoader, typeMapping);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -53,8 +54,11 @@ public class UpdatedDataFieldsProcessFunction
private Set<FieldIdentifier> latestFields;

public UpdatedDataFieldsProcessFunction(
SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
super(catalogLoader);
SchemaManager schemaManager,
Identifier identifier,
CatalogLoader catalogLoader,
TypeMapping typeMapping) {
super(catalogLoader, typeMapping);
this.schemaManager = schemaManager;
this.identifier = identifier;
this.latestFields = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -52,6 +53,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
protected final CatalogLoader catalogLoader;
protected Catalog catalog;
private boolean caseSensitive;
private TypeMapping typeMapping;

private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
Expand All @@ -71,8 +73,10 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
private static final List<DataTypeRoot> TIMESTAMP_TYPES =
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);

protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) {
protected UpdatedDataFieldsProcessFunctionBase(
CatalogLoader catalogLoader, TypeMapping typeMapping) {
this.catalogLoader = catalogLoader;
this.typeMapping = typeMapping;
}

/**
Expand Down Expand Up @@ -214,6 +218,15 @@ protected List<SchemaChange> extractSchemaChanges(
oldFields.put(oldField.name(), oldField);
}

boolean allowTypeChange =
this.typeMapping == null
|| !this.typeMapping.containsMode(TypeMapping.TypeMappingMode.NO_CHANGE);

boolean allowDecimalTypeChange =
this.typeMapping == null
|| !this.typeMapping.containsMode(
TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);

List<SchemaChange> result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
Expand All @@ -231,7 +244,10 @@ protected List<SchemaChange> extractSchemaChanges(
SchemaChange.updateColumnComment(
new String[] {newFieldName}, newField.description()));
}
} else {
} else if (allowTypeChange) {
if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) {
continue;
}
// update column type
result.add(SchemaChange.updateColumnType(newFieldName, newField.type()));
// update column comment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void testSchemaEvolution() throws Exception {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
identifier,
catalogLoader))
catalogLoader,
TypeMapping.defaultMapping()))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.NO_CHANGE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1148,4 +1150,71 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval
public void testWaterMarkSyncTable() throws Exception {
testWaterMarkSyncTable(CANAL);
}

@Test
@Timeout(60)
public void testVarcharNoChange() throws Exception {
String topic = "varchar-no-change";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withTypeMappingModes(NO_CHANGE.configString())
.build();
runActionWithDefaultEnv(action);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
new String[] {"k", "v"});
waitForResult(
Collections.singletonList("+I[1, one]"),
getFileStoreTable(tableName),
rowType,
Collections.singletonList("k"));
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-2.txt");
waitForResult(
Arrays.asList("+I[1, one]", "+I[2, two]"),
getFileStoreTable(tableName),
rowType, // should not change
Collections.singletonList("k"));
}

@Test
@Timeout(60)
public void testDecimalNoChange() throws Exception {
String topic = "decimal-no-change";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-3.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withTypeMappingModes(DECIMAL_NO_CHANGE.configString())
.build();
runActionWithDefaultEnv(action);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.DECIMAL(10, 2)},
new String[] {"k", "v"});
waitForResult(
Collections.singletonList("+I[1, 1.20]"),
getFileStoreTable(tableName),
rowType,
Collections.singletonList("k"));
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-4.txt");
waitForResult(
Arrays.asList("+I[1, 1.20]", "+I[2, 2.30]"),
getFileStoreTable(tableName),
rowType, // should not change
Collections.singletonList("k"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -171,6 +172,7 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareB
// because we have at most 3 tables and 8 slots in AbstractTestBase
// each table can only get 2 slots
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
.withTypeMapping(TypeMapping.defaultMapping())
.withDatabase(DATABASE_NAME)
.withCatalogLoader(catalogLoader)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"k":1,"v":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006706728,"type":"INSERT"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"k":2,"v":"two"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(20)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006724404,"type":"INSERT"}
Loading

0 comments on commit b5cb6eb

Please sign in to comment.