Skip to content

Commit

Permalink
Add sample dags (#1)
Browse files Browse the repository at this point in the history
Adds few sample DAGs showcasing various Airflow features

roll_d20::
* generates random number. Can read ENV var to override
* DAG structure

![image](https://github.com/user-attachments/assets/8d43cde6-4c89-453e-a626-b24798f2fb5f)

sample_ch_ddl::
* checks if table exists, if not — creates
* DAG structure

![image](https://github.com/user-attachments/assets/ba5e8251-8e87-4d36-afcc-16825e324856)

sample_ch_insert::
* has DAG params for number of randoms to insert
* updates dataset
* DAG structure 

![image](https://github.com/user-attachments/assets/71818527-988f-44d5-9319-1c6d06e49af5)

sample_ch_stats::
* triggered by dataset update from insert dag 
* DAG structure

![image](https://github.com/user-attachments/assets/fa0e9496-fa86-4dfe-95e8-325eb7bd3479)
* display stats task

![image](https://github.com/user-attachments/assets/fc38b389-87d1-4875-bde8-d9f8152cbca1)
  • Loading branch information
cra authored Sep 17, 2024
1 parent 30d6524 commit d7a52c1
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 1 deletion.
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,35 @@
# sample-airflow-dags
Example dags for DC Managed Airflow, showcasing interaction with other DC services
Example dags for DobuleCloud Managed Airflow, showcasing interaction with other DC services

Refer to documentation for Getting Started guide: https://double.cloud/docs/en/managed-airflow/get-started

## ClickHouse connection

You would need a ClickHouse connection named `ch_default` to make DAGs with tag `clickhouse` connect to your ClickHouse instance.
If using DoubleCloud ClickHouse, create a generic connection with extra setting `{"secure": true}`.

## DAGs

[roll_d20](./dags/roll_d20.py) is not connected to anything and can be used to check new setup. Once enabled, it runs on a cron schedule every 5 minutes. You can set env variable `RND_SEED_OVERRIDE` in DC Airflow cluster settings to specify custom random seed for reproducibility.

![roll_d20_graph](./img/roll_d20_graph.png)

---

[sample_ch_ddl](./dags/sample_ch_ddl.py) checks if `sample_table` exists in clickhouse connection with connection_id `ch_default`.

![sample_ch_ddl_graph](./img/sample_ch_ddl_graph.png)

---

[sample_ch_insert](./dags/sample_ch_insert.py) inserts specified number of rows in `sample_table` and updates dataset `clickhouse://sample_table`.

![sample_ch_insert_graph](./img/sample_ch_insert_graph.png)

---

[sample_ch_stats](./dags/sample_ch_stats.py) computes stats on `sample_table` and outputs them in task logs. Triggered on updates in dataset `clickhouse://sample_table`

![sample_ch_stats_graph](./img/sample_ch_stats_graph.png)

![sample_ch_stats_log](./img/sample_ch_stats_log.png)
40 changes: 40 additions & 0 deletions dags/roll_d20.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import random
import pathlib
from datetime import datetime as dt

from airflow.decorators import dag, task


@dag(
dag_id=pathlib.Path(__file__).stem,
description="Example DAG with two chained tasks that outputs a random number between 1 and 20",
schedule='*/5 * * * *',
start_date=dt(2024, 9, 1, 0, 0, 0),
catchup=False,
tags=["sample", "random"],
)
def roll_d20():
@task
def dice_roll():
seed = os.environ.get('RND_SEED_OVERRIDE')
if seed:
seed = int(seed)
print("Random seed override:", seed)
random.seed(seed)

return random.randint(1, 20)

@task
def roll_result(roll_value):
print("Hello from DoubleCloud")
print("You rolled", roll_value)

roll_result(roll_value=dice_roll())


my_dag = roll_d20()


if __name__ == '__main__':
my_dag.test()
66 changes: 66 additions & 0 deletions dags/sample_ch_ddl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import datetime
import pathlib

from airflow.utils.trigger_rule import TriggerRule
from airflow.decorators import dag

from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators import sql
from airflow_clickhouse_plugin.operators.clickhouse_dbapi import (
ClickHouseBaseDbApiOperator,
)


class ClickHouseBranchSQLOperator(
sql.BranchSQLOperator,
ClickHouseBaseDbApiOperator,
):
"""
temporary workaround for Airflow < 2.9.4
see https://github.com/bryzgaloff/airflow-clickhouse-plugin/issues/87
"""

pass


@dag(
dag_id=pathlib.Path(__file__).stem,
schedule=None,
start_date=datetime.datetime(2024, 9, 1, 0, 0, 0),
catchup=False,
dag_display_name="Create sample_table",
tags=["sample", "clickhouse", "ddl"],
max_active_runs=1,
)
def sample_ddl_stats():
check_tbl_exists = ClickHouseBranchSQLOperator(
task_id='check_if_sample_table_exists',
sql='EXISTS sample_table',
conn_id='ch_default',
follow_task_ids_if_true='do_nothing',
follow_task_ids_if_false='create_sample_table',
)

do_nothing = EmptyOperator(task_id="do_nothing")
create_tbl = ClickHouseOperator(
task_id='create_sample_table',
sql="""
CREATE TABLE IF NOT EXISTS sample_table
(
id UInt32,
value Float64,
category Enum8('A' = 1, 'B' = 2, 'C' = 3)
) ENGINE = MergeTree() ORDER BY id;
""",
clickhouse_conn_id='ch_default',
)

check_tbl_exists >> [create_tbl, do_nothing]


my_dag = sample_ddl_stats()


if __name__ == '__main__':
my_dag.test()
48 changes: 48 additions & 0 deletions dags/sample_ch_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import datetime
import pathlib

from airflow.datasets import Dataset
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator


sql_dir = pathlib.Path(__file__).absolute().parent / "sql"
sample_table_dataset = Dataset('clickhouse://sample_table')


@dag(
dag_id=pathlib.Path(__file__).stem,
schedule=None,
start_date=datetime.datetime(2024, 9, 1, 0, 0, 0),
catchup=False,
template_searchpath=[sql_dir],
dag_display_name="Insert data to sample_table",
tags=["sample", "clickhouse", "random"],
max_active_runs=1,
params={
'num_rows': 1_000_000,
},
)
def sample_ch_insert():
start = EmptyOperator(task_id="start")
insert_data = ClickHouseOperator(
task_id='insert_into_sample_table',
sql='insert_into_sample_table.sql',
clickhouse_conn_id='ch_default',
params={
"num_rows": "{{ params.num_rows }}",
},
outlets=[sample_table_dataset],
)

start >> insert_data


my_dag = sample_ch_insert()


if __name__ == '__main__':
my_dag.test()
65 changes: 65 additions & 0 deletions dags/sample_ch_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import datetime
import pathlib

from airflow.datasets import Dataset
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator


sql_dir = pathlib.Path(__file__).absolute().parent / "sql"
sample_table_dataset = Dataset('clickhouse://sample_table')


@dag(
dag_id=pathlib.Path(__file__).stem,
schedule=sample_table_dataset,
start_date=datetime.datetime(2024, 9, 1, 0, 0, 0),
catchup=False,
template_searchpath=[sql_dir],
dag_display_name="Compute stats on sample_table",
tags=["sample", "clickhouse", "stats"],
max_active_runs=1,
)
def sample_ch_insert():
start = EmptyOperator(task_id="start")
compute_stats = ClickHouseOperator(
task_id='compute_stats_sample_table',
sql="""
SELECT
category,
count() AS total_count,
avg(value) AS mean_value,
median(value) AS median_value,
stddevPop(value) AS std_dev,
min(value) AS min_value,
max(value) AS max_value
FROM sample_table
GROUP BY category
ORDER BY category;
""",
with_column_types=True,
clickhouse_conn_id='ch_default',
)

@task
def display_stats(upstream_xcom):
stats, names = upstream_xcom
print("Stats on sample_table:")
print(",".join(t for t, _ in names))
for row in stats:
print(row)

end = EmptyOperator(task_id="end")

start >> compute_stats
display_stats(compute_stats.output) >> end


my_dag = sample_ch_insert()


if __name__ == '__main__':
my_dag.test()
11 changes: 11 additions & 0 deletions dags/sql/insert_into_sample_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
INSERT INTO sample_table (id, value, category)
SELECT
number AS id,
randNormal(50, 10) AS value,
multiIf(
rand() % 3 = 0, 'A',
rand() % 3 = 1, 'B',
'C'
) AS category
FROM numbers({{ params.num_rows }})
SETTINGS max_insert_block_size = 100000;
Binary file added img/roll_d20_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/sample_ch_ddl_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/sample_ch_insert_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/sample_ch_stats_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/sample_ch_stats_log.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit d7a52c1

Please sign in to comment.