Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions dags/example_dag_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from dbt_airflow.core.task_group import DbtTaskGroup


@dag(schedule=None, tags=['example'], start_date=days_ago(1))
def tutorial_dbt_airflow_basic():
"""
This is an example Airflow DAG that represents a fundamental ELT pipeline where the
transformation part is performed with `dbt-airflow`.

The creation of sub task-groups is disabled such that all Airflow Tasks (each of which
corresponds to a single dbt resource type), will be under the main DbtTaskGroup named
`my-dbt-project`.
"""
extract = DummyOperator(task_id='extract')
load = DummyOperator(task_id='load')
transform = DbtTaskGroup(
group_id='my-dbt-project',
dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('/path/to/dbt/project/'),
dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'),
create_sub_task_groups=False,
)

extract >> load >> transform


dag = tutorial_dbt_airflow_basic()
78 changes: 78 additions & 0 deletions dags/example_dag_extra_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from dbt_airflow.core.task_group import DbtTaskGroup
from dbt_airflow.core.task import ExtraTask


@dag(schedule=None, tags=['example'], start_date=days_ago(1))
def tutorial_dbt_airflow_extra_tasks():
"""
This is an example Airflow DAG that represents a fundamental ELT pipeline where the
transformation part is performed with `dbt-airflow`. Furthermore, additional Airflow tasks are
also added in between specific dbt tasks.

The creation of sub task-groups is disabled such that all Airflow Tasks (each of which
corresponds to a single dbt resource type), will be under the main DbtTaskGroup named
`my-dbt-project`.
"""
extract = DummyOperator(task_id='extract')
load = DummyOperator(task_id='load')

extra_tasks = [
ExtraTask(
task_id='test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world'),
},
upstream_task_ids={
'model.example_dbt_project.int_customers_per_store',
'model.example_dbt_project.int_revenue_by_date'
}
),
ExtraTask(
task_id='another_test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 2!'),
},
upstream_task_ids={
'test.example_dbt_project.int_customers_per_store',
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
}
),
ExtraTask(
task_id='test_task_3',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 3!'),
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
},
upstream_task_ids={
'model.example_dbt_project.int_revenue_by_date',
},
)
]

transform = DbtTaskGroup(
group_id='my-dbt-project',
dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('/path/to/dbt/project/'),
dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'),
create_sub_task_groups=False,
extra_tasks=extra_tasks,
)

extract >> load >> transform


dag = tutorial_dbt_airflow_extra_tasks()
28 changes: 28 additions & 0 deletions dags/example_dag_sub_task_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from dbt_airflow.core.task_group import DbtTaskGroup


@dag(schedule=None, tags=['example'])
def tutorial_dbt_airflow_basic():
"""
This is an example Airflow DAG that represents a fundamental ELT pipeline where the
transformation part is performed with `dbt-airflow`
"""
extract = DummyOperator(task_id='extract')
load = DummyOperator(task_id='load')
transform = DbtTaskGroup(
group_id='my-dbt-project',
dbt_manifest_path=Path('example_dbt_project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('example_dbt_project/'),
dbt_profile_path=Path('example_dbt_project/profiles'),
create_sub_task_groups=True,
)

extract >> load >> transform


dag = tutorial_dbt_airflow_basic()
110 changes: 56 additions & 54 deletions dags/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,64 @@

with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
start_date=datetime(2023, 5, 16),
catchup=False,
tags=['example'],
schedule_interval='0 6 * * *',
) as dag:

extra_tasks = [
ExtraTask(
task_id='test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world'),
},
upstream_task_ids={
'model.example_dbt_project.int_customers_per_store',
'model.example_dbt_project.int_revenue_by_date'
}
),
ExtraTask(
task_id='another_test_task',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 2!'),
},
upstream_task_ids={
'test.example_dbt_project.int_customers_per_store',
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
}
),
ExtraTask(
task_id='test_task_3',
operator=PythonOperator,
operator_args={
'python_callable': lambda: print('Hello world 3!'),
},
downstream_task_ids={
'snapshot.example_dbt_project.int_customers_per_store_snapshot',
},
upstream_task_ids={
'model.example_dbt_project.int_revenue_by_date',
},
)
]

t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')

tg = DbtTaskGroup(
group_id='dbt-company',
dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('/opt/airflow/example_dbt_project/'),
dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'),
extra_tasks=extra_tasks,
create_sub_task_groups=True,
)

t1 >> tg >> t2
# extra_tasks = [
# ExtraTask(
# task_id='test_task',
# operator=PythonOperator,
# operator_args={
# 'python_callable': lambda: print('Hello world'),
# },
# upstream_task_ids={
# 'model.example_dbt_project.int_customers_per_store',
# 'model.example_dbt_project.int_revenue_by_date'
# }
# ),
# ExtraTask(
# task_id='another_test_task',
# operator=PythonOperator,
# operator_args={
# 'python_callable': lambda: print('Hello world 2!'),
# },
# upstream_task_ids={
# 'test.example_dbt_project.int_customers_per_store',
# },
# downstream_task_ids={
# 'snapshot.example_dbt_project.int_customers_per_store_snapshot',
# }
# ),
# ExtraTask(
# task_id='test_task_3',
# operator=PythonOperator,
# operator_args={
# 'python_callable': lambda: print('Hello world 3!'),
# },
# downstream_task_ids={
# 'snapshot.example_dbt_project.int_customers_per_store_snapshot',
# },
# upstream_task_ids={
# 'model.example_dbt_project.int_revenue_by_date',
# },
# )
# ]
#
# t1 = DummyOperator(task_id='dummy_1')
# t2 = DummyOperator(task_id='dummy_2')
#
# tg = DbtTaskGroup(
# group_id='dbt-company',
# dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),
# dbt_target='dev',
# dbt_project_path=Path('/opt/airflow/example_dbt_project/'),
# dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'),
# extra_tasks=extra_tasks,
# create_sub_task_groups=True,
# )
#
# t1 >> tg >> t2
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
36 changes: 0 additions & 36 deletions docs/examples.md

This file was deleted.

59 changes: 59 additions & 0 deletions docs/examples/example_dag_basic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Example DAGs: Basic
This is an example DAG that will create two (dummy) tasks namely `extract` and `load`,
followed by a `DbtTaskGroup` that renders a dbt project that:
- Is located in `/path/to/dbt/project/`
- Whose compiled manifest JSON file is found under `/path/to/dbt/project/target/manifest.json`
- Uses the profile specified (in `dbt_project.yml`) in `profiles.yml` file, that is located in `/path/to/dbt/project/profiles/dir`.
- With `dev` target (that must be present in profile definition)

The `DbtTaskGroup` has disabled `create_sub_task_groups`, meaning that all individual dbt resources will be rendered
at the same level, under the `DbtTaskGroup` Airflow Task Group.


## Code
```python
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from dbt_airflow.core.task_group import DbtTaskGroup


@dag(schedule=None, tags=['example'], start_date=days_ago(1))
def tutorial_dbt_airflow_basic():
"""
This is an example Airflow DAG that represents a fundamental ELT pipeline where the
transformation part is performed with `dbt-airflow`.

The creation of sub task-groups is disabled such that all Airflow Tasks (each of which
corresponds to a single dbt resource type), will be under the main DbtTaskGroup named
`my-dbt-project`.
"""
extract = DummyOperator(task_id='extract')
load = DummyOperator(task_id='load')
transform = DbtTaskGroup(
group_id='my-dbt-project',
dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'),
dbt_target='dev',
dbt_project_path=Path('/path/to/dbt/project/'),
dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'),
create_sub_task_groups=False,
)

extract >> load >> transform


dag = tutorial_dbt_airflow_basic()
```

## How the DAG is rendered on Airflow

This is how the DAG will initially look like on Apache Airflow:

<img style="display: block; margin: 0 auto" src="../blob/examples/example_dag_basic/generic_view.png" alt="test">

When you click on the `my-dbt-project` TaskGroup, you will then be able to see all the dbt resources rendered as
individual Airflow Tasks.

<img style="display: block; margin: 0 auto" src="../blob/examples/example_dag_basic/detailed_view.png" alt="test">
Loading