Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add new type_mapping options to ignore some type changes when spcified #4921

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
<li>"decimal-no-change": Ignore decimal type change.</li>
<li>"no-change": Ignore any type change.</li>
</ul>
</td>
</tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
<li>"decimal-no-change": Ignore decimal type change.</li>
<li>"no-change": Ignore any type change.</li>
</ul>
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ protected void buildSink(
.withInput(input)
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withTypeMapping(typeMapping)
.withDatabase(database)
.withTables(tables)
.withMode(mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ protected void buildSink(
.withParserFactory(parserFactory)
.withTable(fileStoreTable)
.withIdentifier(new Identifier(database, table))
.withCatalogLoader(catalogLoader());
.withCatalogLoader(catalogLoader())
.withTypeMapping(typeMapping);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public enum TypeMappingMode {
TO_STRING,
CHAR_TO_STRING,
LONGTEXT_TO_BYTES,
BIGINT_UNSIGNED_TO_BIGINT;
BIGINT_UNSIGNED_TO_BIGINT,
DECIMAL_NO_CHANGE,
NO_CHANGE;

private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =
Arrays.stream(TypeMappingMode.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class CdcSinkBuilder<T> {
private Table table = null;
private Identifier identifier = null;
private CatalogLoader catalogLoader = null;
private TypeMapping typeMapping = null;

@Nullable private Integer parallelism;

Expand Down Expand Up @@ -82,6 +84,11 @@ public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
return this;
}

public CdcSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
}

public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
Expand Down Expand Up @@ -109,7 +116,8 @@ public DataStreamSink<?> build() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(dataTable.fileIO(), dataTable.location()),
identifier,
catalogLoader))
catalogLoader,
typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
// Paimon tables. 2) in multiplex sink where it is used to
// initialize different writers to multiple tables.
private CatalogLoader catalogLoader;
private TypeMapping typeMapping;

// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
Expand Down Expand Up @@ -116,6 +119,11 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(CatalogLoader catalo
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode mode) {
this.mode = mode;
return this;
Expand Down Expand Up @@ -153,7 +161,7 @@ private void buildCombinedCdcSink() {
parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
.keyBy(t -> t.f0)
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader, typeMapping))
.name("Schema Evolution");

DataStream<CdcMultiplexRecord> converted =
Expand Down Expand Up @@ -201,7 +209,8 @@ private void buildDividedCdcSink() {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
Identifier.create(database, table.name()),
catalogLoader))
catalogLoader,
typeMapping))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -52,8 +53,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction

private final Map<Identifier, SchemaManager> schemaManagers = new HashMap<>();

public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
super(catalogLoader);
public MultiTableUpdatedDataFieldsProcessFunction(
CatalogLoader catalogLoader, TypeMapping typeMapping) {
super(catalogLoader, typeMapping);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -53,8 +54,11 @@ public class UpdatedDataFieldsProcessFunction
private Set<FieldIdentifier> latestFields;

public UpdatedDataFieldsProcessFunction(
SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
super(catalogLoader);
SchemaManager schemaManager,
Identifier identifier,
CatalogLoader catalogLoader,
TypeMapping typeMapping) {
super(catalogLoader, typeMapping);
this.schemaManager = schemaManager;
this.identifier = identifier;
this.latestFields = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -52,6 +53,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
protected final CatalogLoader catalogLoader;
protected Catalog catalog;
private boolean caseSensitive;
private TypeMapping typeMapping;

private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
Expand All @@ -71,8 +73,10 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
private static final List<DataTypeRoot> TIMESTAMP_TYPES =
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);

protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) {
protected UpdatedDataFieldsProcessFunctionBase(
CatalogLoader catalogLoader, TypeMapping typeMapping) {
this.catalogLoader = catalogLoader;
this.typeMapping = typeMapping;
}

/**
Expand Down Expand Up @@ -214,6 +218,15 @@ protected List<SchemaChange> extractSchemaChanges(
oldFields.put(oldField.name(), oldField);
}

boolean allowTypeChange =
this.typeMapping == null
|| !this.typeMapping.containsMode(TypeMapping.TypeMappingMode.NO_CHANGE);

boolean allowDecimalTypeChange =
this.typeMapping == null
|| !this.typeMapping.containsMode(
TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);

List<SchemaChange> result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
Expand All @@ -231,7 +244,10 @@ protected List<SchemaChange> extractSchemaChanges(
SchemaChange.updateColumnComment(
new String[] {newFieldName}, newField.description()));
}
} else {
} else if (allowTypeChange) {
if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) {
continue;
}
// update column type
result.add(SchemaChange.updateColumnType(newFieldName, newField.type()));
// update column comment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void testSchemaEvolution() throws Exception {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
identifier,
catalogLoader))
catalogLoader,
TypeMapping.defaultMapping()))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.NO_CHANGE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1148,4 +1150,71 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval
public void testWaterMarkSyncTable() throws Exception {
testWaterMarkSyncTable(CANAL);
}

@Test
@Timeout(60)
public void testVarcharNoChange() throws Exception {
String topic = "varchar-no-change";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withTypeMappingModes(NO_CHANGE.configString())
.build();
runActionWithDefaultEnv(action);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
new String[] {"k", "v"});
waitForResult(
Collections.singletonList("+I[1, one]"),
getFileStoreTable(tableName),
rowType,
Collections.singletonList("k"));
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-2.txt");
waitForResult(
Arrays.asList("+I[1, one]", "+I[2, two]"),
getFileStoreTable(tableName),
rowType, // should not change
Collections.singletonList("k"));
}

@Test
@Timeout(60)
public void testDecimalNoChange() throws Exception {
String topic = "decimal-no-change";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-3.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.CASE_SENSITIVE.key(), "false"))
.withTypeMappingModes(DECIMAL_NO_CHANGE.configString())
.build();
runActionWithDefaultEnv(action);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.DECIMAL(10, 2)},
new String[] {"k", "v"});
waitForResult(
Collections.singletonList("+I[1, 1.20]"),
getFileStoreTable(tableName),
rowType,
Collections.singletonList("k"));
writeRecordsToKafka(topic, "kafka/canal/table/typenochange/canal-data-4.txt");
waitForResult(
Arrays.asList("+I[1, 1.20]", "+I[2, 2.30]"),
getFileStoreTable(tableName),
rowType, // should not change
Collections.singletonList("k"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -171,6 +172,7 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareB
// because we have at most 3 tables and 8 slots in AbstractTestBase
// each table can only get 2 slots
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
.withTypeMapping(TypeMapping.defaultMapping())
.withDatabase(DATABASE_NAME)
.withCatalogLoader(catalogLoader)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"k":1,"v":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006706728,"type":"INSERT"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"k":2,"v":"two"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"k":"INT","v":"VARCHAR(20)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v":12},"table":"schema_evolution","ts":1683006724404,"type":"INSERT"}
Loading