Skip to content

Commit

Permalink
[FLINK-35152][pipeline-connector/doris] Support create doris auto par…
Browse files Browse the repository at this point in the history
…tition table
  • Loading branch information
qg-lin committed Jan 17, 2025
1 parent 8815f2b commit 07531bf
Show file tree
Hide file tree
Showing 11 changed files with 619 additions and 6 deletions.
19 changes: 19 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,25 @@ pipeline:
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table 的属性</a></td>
</td>
</tr>
<tr>
<td>table.create.auto-partition.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>创建自动分区表的配置。<br/>
当前仅支持DATE/DATETIME类型列的AUTO RANGE PARTITION,分区函数为<code>date_trunc</code>,且Doris版本必须大于2.1.6,查看更多关于 <a href="https://doris.apache.org/docs/table-design/data-partitioning/auto-partitioning">Doris自动分区</a><br/>
支持的属性有:<br/>
<code> table.create.auto-partition.properties.include</code>包含的经过route后的表集合,用逗号分隔,支持正则表达式;<br/>
<code> table.create.auto-partition.properties.exclude</code>排除的经过route后的表集合,用逗号分隔,支持正则表达式;<br/>
<code> table.create.auto-partition.properties.default_partition_key</code>默认分区键;<br/>
<code> table.create.auto-partition.properties.default_partition_unit</code>默认分区单位;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_key</code>特定表的分区键,如未配置取默认分区键;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_unit</code>特定表的分区单位,如未配置取默认分区单位。<br/>
注意:<br/>
1: 如果分区键不为DATE/DATETIME类型,则不会创建分区表。<br/>
2: Doris AUTO RANGE PARTITION不支持NULLABLE列作为分区列,如果您配置的分区键的值为空或者表创建完成后新增了NULLABLE分区列,系统将自动填充默认值(DATE类型为<code>1970-01-01</code>,DATETIME类型为<code>1970-01-01 00:00:00</code>),请选择合适的分区键。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
19 changes: 19 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,25 @@ pipeline:
See more about <a href="https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table Properties</a></td>
</td>
</tr>
<tr>
<td>table.create.auto-partition.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Create the auto partition Properties configuration of the table.<br/>
Currently the partition function only supports date_trunc, and the partition column supports only DATE or DATETIME types, and the version of Doris must greater than 2.1.6. See more about <a href="https://doris.apache.org/docs/table-design/data-partitioning/auto-partitioning">Doris Auto Partitioning</a><br/>
These properties are supported now:<br/>
<code> table.create.auto-partition.properties.include</code>A collection of tables after route to include, separated by commas, supports regular expressions;<br/>
<code> table.create.auto-partition.properties.exclude</code>A collection of tables after route to exclude, separated by commas, supports regular expressions;<br/>
<code> table.create.auto-partition.properties.default_partition_key</code>The default partition key;<br/>
<code> table.create.auto-partition.properties.default_partition_unit</code>The default partition unit;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_key</code>The partition key of a specific table. If not set, the default partition key is used;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_unit</code>The partition unit of a specific table. If not set, the default partition unit is used.<br/>
Note:<br/>
1: If the partition key is not DATE/DATETIME type, auto partition tables won't be created.<br/>
2: Doris AUTO RANGE PARTITION does not support NULLABLE columns as partition key, if Flink CDC get a NULL value or a NULLABLE partition key was added after the table was created, will automatically fill it with a default value(DATE:<code>1970-01-01</code>, DATETIME:<code>1970-01-01 00:00:00</code>), chose a suitable partition key is very important.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ limitations under the License.
<name>flink-cdc-pipeline-connector-doris</name>

<properties>
<doris.connector.version>24.0.1</doris.connector.version>
<doris.connector.version>24.1.0</doris.connector.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;

Expand All @@ -67,7 +68,10 @@ public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, STREAM_LOAD_PROP_PREFIX);
.validateExcept(
TABLE_CREATE_PROPERTIES_PREFIX,
STREAM_LOAD_PROP_PREFIX,
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);

Configuration config = context.getFactoryConfiguration();
DorisOptions.Builder optionsBuilder = DorisOptions.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public EventSinkProvider getEventSinkProvider() {
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
new DorisEventSerializer(zoneId, configuration)));
} else {
return FlinkSinkProvider.of(
new DorisBatchSink<>(
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
new DorisEventSerializer(zoneId, configuration)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ public class DorisDataSinkOptions {
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
// Prefix for Doris Create table.
public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";
// Prefix for Doris Create auto partition table.
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX =
"table.create.auto-partition.properties.";
public static final String TABLE_CREATE_PARTITION_KEY = "partition-key";
public static final String TABLE_CREATE_PARTITION_UNIT = "partition-unit";

public static final String TABLE_CREATE_DEFAULT_PARTITION_KEY =
"default-" + TABLE_CREATE_PARTITION_KEY;
public static final String TABLE_CREATE_DEFAULT_PARTITION_UNIT =
"default-" + TABLE_CREATE_PARTITION_UNIT;

public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_KEY;
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_UNIT;

public static final String TABLE_CREATE_PARTITION_INCLUDE = "include";
public static final String TABLE_CREATE_PARTITION_EXCLUDE = "exclude";

public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_INCLUDE;
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_EXCLUDE;

public static Map<String, String> getPropertiesByPrefix(
Configuration tableOptions, String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.connectors.doris.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
Expand All @@ -26,8 +28,14 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -42,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;

Expand All @@ -61,8 +70,11 @@ public class DorisEventSerializer implements DorisRecordSerializer<Event> {
/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;

public DorisEventSerializer(ZoneId zoneId) {
public final Configuration dorisConfig;

public DorisEventSerializer(ZoneId zoneId, Configuration config) {
pipelineZoneId = zoneId;
dorisConfig = config;
}

@Override
Expand Down Expand Up @@ -108,6 +120,30 @@ private DorisRecord applyDataChangeEvent(DataChangeEvent event) throws JsonProce
throw new UnsupportedOperationException("Unsupport Operation " + op);
}

// get partition info from config
Tuple2<String, String> partitionInfo =
DorisSchemaUtils.getPartitionInfo(dorisConfig, schema, tableId);
if (!Objects.isNull(partitionInfo)) {
String partitionKey = partitionInfo.f0;
Object partitionValue = valueMap.get(partitionKey);
// fill partition column by default value if null
if (Objects.isNull(partitionValue)) {
schema.getColumn(partitionKey)
.ifPresent(
column -> {
DataType dataType = column.getType();
if (dataType instanceof DateType) {
valueMap.put(partitionKey, DorisSchemaUtils.DEFAULT_DATE);
} else if (dataType instanceof LocalZonedTimestampType
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType) {
valueMap.put(
partitionKey, DorisSchemaUtils.DEFAULT_DATETIME);
}
});
}
}

return DorisRecord.of(
tableId.getSchemaName(),
tableId.getTableName(),
Expand All @@ -121,7 +157,6 @@ public Map<String, Object> serializerRecord(RecordData recordData, Schema schema
Preconditions.checkState(
columns.size() == recordData.getArity(),
"Column size does not match the data size");

for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.doris.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -39,6 +40,7 @@
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
import org.apache.flink.util.CollectionUtil;

import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
Expand Down Expand Up @@ -162,6 +164,10 @@ private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveEx
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);

Tuple2<String, String> partitionInfo =
DorisSchemaUtils.getPartitionInfo(config, schema, tableId);
tableSchema.setPartitionInfo(partitionInfo);
schemaChangeManager.createTable(tableSchema);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.doris.utils;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions;

import java.util.Map;

import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_KEY;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_UNIT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_EXCLUDE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_INCLUDE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_KEY;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_UNIT;

/** Utilities for doris schema. */
public class DorisSchemaUtils {

public static final String DEFAULT_DATE = "1970-01-01";
public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";

/**
* Get partition info by config. Currently only supports DATE/TIMESTAMP AUTO RANGE PARTITION and
* doris version should greater than 2.1.6
*
* @param config
* @param schema
* @param tableId
* @return
*/
public static Tuple2<String, String> getPartitionInfo(
Configuration config, Schema schema, TableId tableId) {
Map<String, String> autoPartitionProperties =
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);
String excludes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_EXCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(excludes)) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(excludes).build();
if (selectExclude.isMatch(tableId)) {
return null;
}
}

String includes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_INCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(includes)) {
Selectors selectInclude =
new Selectors.SelectorsBuilder().includeTables(includes).build();
if (!selectInclude.isMatch(tableId)) {
return null;
}
}

String partitionKey =
autoPartitionProperties.get(
tableId.identifier() + "." + TABLE_CREATE_PARTITION_KEY);
String partitionUnit =
autoPartitionProperties.get(
tableId.identifier() + "." + TABLE_CREATE_PARTITION_UNIT);
if (StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
partitionKey = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_KEY);
}
if (StringUtils.isNullOrWhitespaceOnly(partitionUnit)) {
partitionUnit = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_UNIT);
}

if (schema.getColumn(partitionKey).isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(partitionKey)) {

DataType dataType = schema.getColumn(partitionKey).get().getType();
if (dataType instanceof LocalZonedTimestampType
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType
|| dataType instanceof DateType) {
return new Tuple2<>(partitionKey, partitionUnit);
}
}
return null;
}
}
Loading

0 comments on commit 07531bf

Please sign in to comment.