You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
package com.zzvcom.bigdata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
时间格式转换类
*/
public class DebeziumConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger log = LoggerFactory.getLogger(DebeziumConverter.class);
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss";
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
private DateTimeFormatter dateFormatter;
private DateTimeFormatter timeFormatter;
private DateTimeFormatter datetimeFormatter;
private SchemaBuilder schemaBuilder;
private String databaseType;
private String schemaNamePrefix;
@OverRide
public void configure(Properties properties) {
// 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserver
this.databaseType = properties.getProperty("database.type");
// 如果未设置,或者设置的不是mysql、sqlserver,则抛出异常。
if (this.databaseType == null || (!this.databaseType.equals("mysql"))) {
throw new IllegalArgumentException("database.type 必须设置为 'mysql' or 'sqlserver'");
}
// 选填参数:format.date、format.time、format.datetime。获取时间格式化的格式
String dateFormat = properties.getProperty("format.date", DATE_FORMAT);
String timeFormat = properties.getProperty("format.time", TIME_FORMAT);
String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);
// 获取自身类的包名+数据库类型为默认schema.name
String className = this.getClass().getName();
// 查看是否设置schema.name.prefix
this.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + "." + this.databaseType);
// 初始化时间格式化器
dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);
}
// mysql的转换器
public void registerMysqlConverter(String columnType, ConverterRegistration converterRegistration) {
String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
schemaBuilder = SchemaBuilder.string().name(schemaName);
switch (columnType) {
case "DATE":
converterRegistration.register(schemaBuilder, value -> {
if (value == null) {
return null;
} else if (value instanceof java.time.LocalDate) {
return dateFormatter.format((java.time.LocalDate) value);
} else {
return this.failConvert(value, schemaName);
}
});
break;
case "TIME":
converterRegistration.register(schemaBuilder, value -> {
if (value == null) {
return null;
} else if (value instanceof java.time.Duration) {
return timeFormatter.format(
java.time.LocalTime.
ofNanoOfDay(((java.time.Duration) value)
.toNanos()));
} else {
return this.failConvert(value, schemaName);
}
});
break;
case "DATETIME":
case "TIMESTAMP":
converterRegistration.register(schemaBuilder, value -> {
if (value == null) {
return null;
} else if (value instanceof java.time.LocalDateTime) {
return datetimeFormatter.format((java.time.LocalDateTime) value);
} else if (value instanceof java.time.ZonedDateTime) {
return datetimeFormatter.format(((java.time.ZonedDateTime) value).toLocalDateTime());
} else if (value instanceof java.sql.Timestamp) {
return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
} else {
return this.failConvert(value, schemaName);
}
});
break;
default:
schemaBuilder = null;
break;
}
}
@OverRide
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
// 获取字段类型
String columnType = relationalColumn.typeName().toUpperCase();
log.info("数据库:{},字段名称:{},字段类型:{},jdbc type :{}", this.databaseType, relationalColumn.name(), columnType, relationalColumn.jdbcType());
// 根据数据库类型调用不同的转换器
if (this.databaseType.equals("mysql")) {
this.registerMysqlConverter(columnType, converterRegistration);
} else {
log.warn("不支持的数据库类型: {}", this.databaseType);
schemaBuilder = null;
}
}
private String getClassName(Object value) {
if (value == null) {
return null;
}
return value.getClass().getName();
}
// 类型转换失败时的日志打印
private String failConvert(Object value, String type) {
String valueClass = this.getClassName(value);
String valueString = valueClass == null ? null : value.toString();
log.warn("{}类型转换失败,类型:{},值:{}", type, valueClass, valueString);
return valueString;
}
}
public static Properties getDebeziumProperties() {
Properties debeziumProperties = new Properties();
debeziumProperties.put("converters", "mydebeziumconverter");
debeziumProperties.put("mydebeziumconverter.type", "com.zzvcom.bigdata.DebeziumConverter");
debeziumProperties.put("mydebeziumconverter.database.type", "mysql");
debeziumProperties.put("mydebeziumconverter.format.datetime", "yyyy-MM-dd HH:mm:ss");
debeziumProperties.put("mydebeziumconverter.format.timestamp", "yyyy-MM-dd HH:mm:ss");
debeziumProperties.put("mydebeziumconverter.format.date", "yyyy-MM-dd");
debeziumProperties.put("mydebeziumconverter.format.time", "HH:mm:ss");
return debeziumProperties;
}
MySqlSource mySqlSource = MySqlSource.builder()
.hostname(ApplicationConstant.HOSTNAME)
.port(ApplicationConstant.PORT)
.username(ApplicationConstant.USERNAME)
.password(ApplicationConstant.PASSWORD)
.databaseList(ApplicationConstant.DATABASE)
.tableList(ApplicationConstant.TABLES)
//使用自定义的序列化方式
.deserializer(new MyDeserializationSchemaFunction())
.debeziumProperties(ApplicationConstant.getDebeziumProperties())
.serverTimeZone("Asia/Shanghai")
.startupOptions(StartupOptions.initial())
.build();
我自定的转换类,提交任务的时候显示类没有找到。
另外我把所有的jar都放入到了flink安装目录的lib下。提交的时候只提供自身的jar包。
提示类没有找到“com.zzvcom.bigdata.DebeziumConverter"”
Beta Was this translation helpful? Give feedback.
All reactions