diff --git a/README.md b/README.md index b023f5750..64ff564e1 100644 --- a/README.md +++ b/README.md @@ -587,6 +587,7 @@ data_sources: - [Apache Ignite](#apache-ignite) - [Apache Spark](#apache-spark) - [Cassandra](#cassandra) +- [ClickHouse](#clickhouse) - [Druid](#druid) - [Elasticsearch](#elasticsearch) - [Google BigQuery](#google-bigquery) @@ -742,6 +743,20 @@ data_sources: Use a [read-only role](https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useSecurePermission.html). +### ClickHouse + +Add [ClickHouse Ruby driver](https://github.com/shlima/click_house) to your Gemfile and set: +```yml +data_sources: + my_source: + adapter: clickhouse + url: http://user:password@hostname:8123/database + + # optional settings + ssl_verify: true # false by default +``` +>>>>>>> 41a6b49 (Added support for ClickHouse) + ### Druid Enable [SQL support](http://druid.io/docs/latest/querying/sql.html#configuration) on the broker and set: diff --git a/lib/blazer.rb b/lib/blazer.rb index ceb89c8b9..882268f14 100644 --- a/lib/blazer.rb +++ b/lib/blazer.rb @@ -22,6 +22,7 @@ require_relative "blazer/adapters/athena_adapter" require_relative "blazer/adapters/bigquery_adapter" require_relative "blazer/adapters/cassandra_adapter" +require_relative "blazer/adapters/clickhouse_adapter" require_relative "blazer/adapters/drill_adapter" require_relative "blazer/adapters/druid_adapter" require_relative "blazer/adapters/elasticsearch_adapter" diff --git a/lib/blazer/adapters.rb b/lib/blazer/adapters.rb index eed81a49e..869d45c31 100644 --- a/lib/blazer/adapters.rb +++ b/lib/blazer/adapters.rb @@ -1,6 +1,7 @@ Blazer.register_adapter "athena", Blazer::Adapters::AthenaAdapter Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter Blazer.register_adapter "cassandra", Blazer::Adapters::CassandraAdapter +Blazer.register_adapter "clickhouse", Blazer::Adapters::ClickhouseAdapter Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter Blazer.register_adapter "druid", Blazer::Adapters::DruidAdapter Blazer.register_adapter "elasticsearch", Blazer::Adapters::ElasticsearchAdapter diff --git a/lib/blazer/adapters/clickhouse_adapter.rb b/lib/blazer/adapters/clickhouse_adapter.rb new file mode 100644 index 000000000..76698f2f7 --- /dev/null +++ b/lib/blazer/adapters/clickhouse_adapter.rb @@ -0,0 +1,84 @@ +module Blazer + module Adapters + class ClickhouseAdapter < BaseAdapter + DATE_TIME_TYPES = ["DateTime", "DateTime(%s)", "DateTime64(%d, %s)"].freeze + + def run_statement(statement, _comment) + columns = [] + rows = [] + error = nil + + begin + result = connection.select_all(statement) + unless result.data.blank? + date_time_columns = result.meta + .select { |column| column["type"].in?(DATE_TIME_TYPES) } + .map { |column| column["name"] } + columns = result.first.keys + rows = result.map { |row| convert_time_columns(row, date_time_columns).values } + end + rescue => e + error = e.message + end + + [columns, rows, error] + end + + def tables + connection.tables + end + + def schema + statement = <<-SQL + SELECT table, name, type + FROM system.columns + WHERE database = currentDatabase() + ORDER BY table, position + SQL + + response = connection.post(query: { query: statement, default_format: "CSV" }) + response.body + .group_by { |row| row[0] } + .transform_values { |columns| columns.map { |c| { name: c[1], data_type: c[2] } } } + .map { |table, columns| { schema: "public", table: table, columns: columns } } + end + + def preview_statement + "SELECT * FROM {table} LIMIT 10" + end + + def explain(statement) + connection.explain(statement) + end + + protected + + def connection + @connection ||= ClickHouse::Connection.new(config) + end + + def config + @config ||= begin + uri = URI.parse(settings["url"]) + options = { + scheme: uri.scheme, + host: uri.host, + port: uri.port, + username: uri.user, + password: uri.password, + database: uri.path.sub(/\A\//, ""), + ssl_verify: settings.fetch("ssl_verify", false) + }.compact + ClickHouse::Config.new(**options) + end + end + + def convert_time_columns(row, date_time_columns) + time_values = row.slice(*date_time_columns).transform_values!(&:to_time) + row.merge(time_values) + rescue NoMethodError + row + end + end + end +end