Skip to content

Commit

Permalink
[FLINK-32068] jdbc support clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
leishuiyu authored and WenDing-Y committed Jun 6, 2023
1 parent 5ce9391 commit da070b1
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.apache.flink.connector.jdbc.dialect.clickhouse;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickhouseRowConvert;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;

/** */
public class ClickhouseDialect extends AbstractDialect {

private static final long serialVersionUID = 1L;

// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;

// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;

@Override
public AbstractJdbcRowConverter getRowConverter(RowType rowType) {
return new ClickhouseRowConvert(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
}

@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}

@Override
public String dialectName() {
return "Clickhouse";
}

@Override
public Set<LogicalTypeRoot> supportedTypes() {
// LegacyTypeInfoDataTypeConverter.
return EnumSet.of(
LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.VARBINARY,
LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.flink.connector.jdbc.dialect.clickhouse;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;

/** */
@Internal
public class ClickhouseDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:clickhouse:");
}

@Override
public JdbcDialect create() {
return new ClickhouseDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.flink.connector.jdbc.internal.converter;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/** */
public class ClickhouseRowConvert extends AbstractJdbcRowConverter {

private static final long serialVersionUID = 1L;

@Override
public String converterName() {
return "Clickhouse";
}

public ClickhouseRowConvert(RowType rowType) {
super(rowType);
}
}

0 comments on commit da070b1

Please sign in to comment.