Skip to content

Commit

Permalink
Merge pull request #399 from ClickHouse/make-mv-data-catchup-configur…
Browse files Browse the repository at this point in the history
…able

Make mv data catchup configurable
  • Loading branch information
BentsiLeviav authored Dec 9, 2024
2 parents edad093 + e2ab918 commit 73fe825
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 195 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### Release [1.8.6], 2024-12-05

### Improvement
* Today, on mv model creation, the target table is being populated with the historical data based on the query provided in the mv creation. This catchup mechanism is now behind a config flag and enabled by default (as is today). ([#399](https://github.com/ClickHouse/dbt-clickhouse/pull/399))

### Release [1.8.5], 2024-11-19

### New Features
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,22 @@ select a,b,c from {{ source('raw', 'table_2') }}
>
> When updating a model with multiple materialized views (MVs), especially when renaming one of the MV names, dbt-clickhouse does not automatically drop the old MV. Instead,
> you will encounter the following warning: `Warning - Table <previous table name> was detected with the same pattern as model name <your model name> but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!) `
## Data catchup
Currently, when creating a materialized view (MV), the target table is first populated with historical data before the MV itself is created.

In other words, dbt-clickhouse initially creates the target table and preloads it with historical data based on the query defined for the MV. Only after this step is the MV created.

If you prefer not to preload historical data during MV creation, you can disable this behavior by setting the catchup config to False:

```python
{{config(
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
catchup=False
)}}
```


# Dictionary materializations (experimental)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.8.5'
version = '1.8.6'
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@

{% if backup_relation is none %}
{{ log('Creating new materialized view ' + target_relation.name )}}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }}
{% set catchup_data = config.get("catchup", True) %}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views, catchup_data) }}
{% elif existing_relation.can_exchange %}
{{ log('Replacing existing materialized view ' + target_relation.name) }}
-- in this section, we look for mvs that has the same pattern as this model, but for some reason,
Expand Down Expand Up @@ -132,9 +133,15 @@
2. Create a materialized view using the SQL in the model that inserts
data into the table creating during step 1
#}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views, catchup=True ) -%}
{% call statement('main') %}
{% if catchup == True %}
{{ get_create_table_as_sql(False, relation, sql) }}
{% else %}
{{ log('Catchup data config was set to false, skipping mv-target-table initial insertion ')}}
{% set has_contract = config.get('contract').enforced %}
{{ create_table_or_empty(False, relation, sql, has_contract) }}
{% endif %}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%}
Expand Down
213 changes: 22 additions & 191 deletions tests/integration/adapter/materialized_view/test_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
schema='custom_schema',
schema='catchup' if var('run_type', '') == 'catchup' else 'custom_schema',
**({'catchup': False} if var('run_type', '') == 'catchup' else {})
) }}
{% if var('run_type', '') == '' %}
{% if var('run_type', '') in ['', 'catchup'] %}
select
id,
name,
Expand Down Expand Up @@ -60,74 +61,6 @@
{% endif %}
"""

MULTIPLE_MV_MODEL = """
{{ config(
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
schema='custom_schema_for_multiple_mv',
) }}
{% if var('run_type', '') == '' %}
--mv1:begin
select
id,
name,
case
when name like 'Dade' then 'crash_override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
--mv1:end
union all
--mv2:begin
select
id,
name,
-- sales people are not cool enough to have a hacker alias
'N/A' as hacker_alias
from {{ source('raw', 'people') }}
where department = 'sales'
--mv2:end
{% elif var('run_type', '') == 'extended_schema' %}
--mv1:begin
select
id,
name,
case
-- Dade wasn't always known as 'crash override'!
when name like 'Dade' and age = 11 then 'zero cool'
when name like 'Dade' and age != 11 then 'crash override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
--mv1:end
union all
--mv2:begin
select
id,
name,
-- sales people are not cool enough to have a hacker alias
'N/A' as hacker_alias
from {{ source('raw', 'people') }}
where department = 'sales'
--mv2:end
{% endif %}
"""


SEED_SCHEMA_YML = """
version: 2
Expand Down Expand Up @@ -197,116 +130,30 @@ def test_create(self, project):
result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all")
assert result[0][0] == 4


class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
}

def test_update_incremental(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()

# re-run dbt but this time with the new MV SQL
run_vars = {"run_type": "extended_schema"}
run_dbt(["run", "--vars", json.dumps(run_vars)])

project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2

def test_update_full_refresh(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()

# re-run dbt but this time with the new MV SQL
run_vars = {"run_type": "extended_schema"}
run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)])

project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2


class TestMultipleMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MULTIPLE_MV_MODEL,
}

def test_create(self, project):
def test_disabled_catchup(self, project):
"""
1. create a base table via dbt seed
2. create a model as a materialized view, selecting from the table created in (1)
2. create a model with catchup disabled as a materialized view, selecting from the table created in (1)
3. insert data into the base table and make sure it's there in the target table created in (2)
"""
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_catchup")
results = run_dbt(["seed"])
assert len(results) == 1
columns = project.run_sql("DESCRIBE TABLE people", fetch="all")
assert columns[0][1] == "Int32"

# create the model
run_dbt(["run"])
# create the model with catchup disabled
run_vars = {"run_type": "catchup"}
run_dbt(["run", "--vars", json.dumps(run_vars)])
# check that we only have the new row, without the historical data
assert len(results) == 1

columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all")
columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all")
assert columns[0][1] == "Int32"

with pytest.raises(Exception):
columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all")

check_relation_types(
project.adapter,
{
Expand All @@ -318,25 +165,16 @@ def test_create(self, project):
# insert some data and make sure it reaches the target table
project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering');
"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware');
"""
)

result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all")
assert result == [
(1000, 'Alfie', 'N/A'),
(1231, 'Dade', 'crash_override'),
(2000, 'Bill', 'N/A'),
(3000, 'Charlie', 'N/A'),
(4000, 'Dave', 'N/A'),
(6666, 'Ksenia', 'N/A'),
(8888, 'Kate', 'acid burn'),
(9999, 'Eugene', 'N/A'),
]
result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all")
assert result[0][0] == 1


class TestUpdateMultipleMV:
class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
Expand All @@ -350,11 +188,11 @@ def seeds(self):
@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MULTIPLE_MV_MODEL,
"hackers.sql": MV_MODEL,
}

def test_update_incremental(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()
Expand All @@ -372,15 +210,12 @@ def test_update_incremental(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias",
fetch="all",
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2
assert result[0][0] == "crash_override"
assert result[1][0] == "zero cool"

def test_update_full_refresh(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()
Expand All @@ -398,10 +233,6 @@ def test_update_full_refresh(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias",
fetch="all",
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
print(result)
assert len(result) == 2
assert result[0][0] == "crash override"
assert result[1][0] == "zero cool"
Loading

0 comments on commit 73fe825

Please sign in to comment.