diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index 2d98e75c..5d10b21b 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -1,4 +1,5 @@ import time +import uuid from contextlib import contextmanager from dataclasses import dataclass from typing import Any, Optional, Tuple @@ -6,7 +7,6 @@ import agate import clickhouse_connect import dbt.exceptions -from clickhouse_connect.driver.client import Client as ChClient from clickhouse_connect.driver.exceptions import DatabaseError, Error from dbt.adapters.base import Credentials from dbt.adapters.sql import SQLConnectionManager @@ -98,7 +98,6 @@ def open(cls, connection): return connection credentials = cls.get_credentials(connection.credentials) - kwargs = {} try: handle = clickhouse_connect.get_client( @@ -113,7 +112,7 @@ def open(cls, connection): send_receive_timeout=credentials.send_receive_timeout, verify=credentials.verify, query_limit=0, - **kwargs, + session_id='dbt::' + str(uuid.uuid4()), ) connection.handle = handle connection.state = 'open' @@ -181,16 +180,6 @@ def execute( table = dbt.clients.agate_helper.empty_table() return status, table - def insert_table_data(self, table_name, table: agate.Table): - """ - Insert data into ClickHouse table - :param table_name: Target table name - :param table: Data to be inserted - """ - client: ChClient = self.get_thread_connection().handle - with self.exception_handler(f'INSERT INTO {table_name}'): - client.insert(table_name, table.rows, table.column_names) - def add_query( self, sql: str, @@ -206,7 +195,7 @@ def add_query( logger.debug(f'On {conn.name}: {sql}...') pre = time.time() - client.query(sql) + client.command(sql) status = self.get_status(client) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 7b1d45a4..d90e7763 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -235,10 +235,6 @@ def get_csv_data(self, table): return buf.getvalue() - @available - def insert_table_data(self, table_name, table): - self.connections.insert_table_data(table_name, table) - def run_sql_for_tests(self, sql, fetch, conn): client = conn.handle try: diff --git a/dbt/include/clickhouse/macros/materializations/seed.sql b/dbt/include/clickhouse/macros/materializations/seed.sql index 0d25ddc2..2d692fe4 100644 --- a/dbt/include/clickhouse/macros/materializations/seed.sql +++ b/dbt/include/clickhouse/macros/materializations/seed.sql @@ -1,5 +1,13 @@ {% macro clickhouse__load_csv_rows(model, agate_table) %} - {% do adapter.insert_table_data(this.render(), agate_table) %} + {% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %} + {% set data_sql = adapter.get_csv_data(agate_table) %} + + {% set sql -%} + insert into {{ this.render() }} ({{ cols_sql }}) format CSV + {{ data_sql }} + {%- endset %} + + {% do adapter.add_query(sql, bindings=agate_table, abridge_sql_log=True) %} {% endmacro %} {% macro clickhouse__create_csv_table(model, agate_table) %} diff --git a/dev_requirements.txt b/dev_requirements.txt index 01dcf84d..b40ab240 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,5 +1,5 @@ dbt-core==1.1.0 -clickhouse-connect>=0.0.15 +clickhouse-connect>=0.1.0 pytest==7.0.0 pytest-dotenv==0.5.2 dbt-tests-adapter==1.1 diff --git a/setup.py b/setup.py index ae8e0778..8bddc077 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ def _dbt_clickhouse_version(): }, install_requires=[ f'dbt-core=={dbt_version}', - 'clickhouse-connect>=0.0.15', + 'clickhouse-connect>=0.1.0', ], python_requires=">=3.7", platforms='any',