Skip to content

Commit

Permalink
Merge pull request #55 from ClickHouse/query_rework
Browse files Browse the repository at this point in the history
Revert to CSV insert
  • Loading branch information
guykoh authored Jun 9, 2022
2 parents 2139465 + bb16836 commit e7e283d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 21 deletions.
17 changes: 3 additions & 14 deletions dbt/adapters/clickhouse/connections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Optional, Tuple

import agate
import clickhouse_connect
import dbt.exceptions
from clickhouse_connect.driver.client import Client as ChClient
from clickhouse_connect.driver.exceptions import DatabaseError, Error
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
Expand Down Expand Up @@ -98,7 +98,6 @@ def open(cls, connection):
return connection

credentials = cls.get_credentials(connection.credentials)
kwargs = {}

try:
handle = clickhouse_connect.get_client(
Expand All @@ -113,7 +112,7 @@ def open(cls, connection):
send_receive_timeout=credentials.send_receive_timeout,
verify=credentials.verify,
query_limit=0,
**kwargs,
session_id='dbt::' + str(uuid.uuid4()),
)
connection.handle = handle
connection.state = 'open'
Expand Down Expand Up @@ -181,16 +180,6 @@ def execute(
table = dbt.clients.agate_helper.empty_table()
return status, table

def insert_table_data(self, table_name, table: agate.Table):
"""
Insert data into ClickHouse table
:param table_name: Target table name
:param table: Data to be inserted
"""
client: ChClient = self.get_thread_connection().handle
with self.exception_handler(f'INSERT INTO {table_name}'):
client.insert(table_name, table.rows, table.column_names)

def add_query(
self,
sql: str,
Expand All @@ -206,7 +195,7 @@ def add_query(
logger.debug(f'On {conn.name}: {sql}...')

pre = time.time()
client.query(sql)
client.command(sql)

status = self.get_status(client)

Expand Down
4 changes: 0 additions & 4 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,6 @@ def get_csv_data(self, table):

return buf.getvalue()

@available
def insert_table_data(self, table_name, table):
self.connections.insert_table_data(table_name, table)

def run_sql_for_tests(self, sql, fetch, conn):
client = conn.handle
try:
Expand Down
10 changes: 9 additions & 1 deletion dbt/include/clickhouse/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
{% macro clickhouse__load_csv_rows(model, agate_table) %}
{% do adapter.insert_table_data(this.render(), agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set data_sql = adapter.get_csv_data(agate_table) %}

{% set sql -%}
insert into {{ this.render() }} ({{ cols_sql }}) format CSV
{{ data_sql }}
{%- endset %}

{% do adapter.add_query(sql, bindings=agate_table, abridge_sql_log=True) %}
{% endmacro %}

{% macro clickhouse__create_csv_table(model, agate_table) %}
Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dbt-core==1.1.0
clickhouse-connect>=0.0.15
clickhouse-connect>=0.1.0
pytest==7.0.0
pytest-dotenv==0.5.2
dbt-tests-adapter==1.1
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _dbt_clickhouse_version():
},
install_requires=[
f'dbt-core=={dbt_version}',
'clickhouse-connect>=0.0.15',
'clickhouse-connect>=0.1.0',
],
python_requires=">=3.7",
platforms='any',
Expand Down

0 comments on commit e7e283d

Please sign in to comment.