diff --git a/dags/example_dag_basic.py b/dags/example_dag_basic.py new file mode 100644 index 0000000..04812ad --- /dev/null +++ b/dags/example_dag_basic.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.dummy import DummyOperator +from airflow.utils.dates import days_ago +from dbt_airflow.core.task_group import DbtTaskGroup + + +@dag(schedule=None, tags=['example'], start_date=days_ago(1)) +def tutorial_dbt_airflow_basic(): + """ + This is an example Airflow DAG that represents a fundamental ELT pipeline where the + transformation part is performed with `dbt-airflow`. + + The creation of sub task-groups is disabled such that all Airflow Tasks (each of which + corresponds to a single dbt resource type), will be under the main DbtTaskGroup named + `my-dbt-project`. + """ + extract = DummyOperator(task_id='extract') + load = DummyOperator(task_id='load') + transform = DbtTaskGroup( + group_id='my-dbt-project', + dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'), + dbt_target='dev', + dbt_project_path=Path('/path/to/dbt/project/'), + dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'), + create_sub_task_groups=False, + ) + + extract >> load >> transform + + +dag = tutorial_dbt_airflow_basic() diff --git a/dags/example_dag_extra_tasks.py b/dags/example_dag_extra_tasks.py new file mode 100644 index 0000000..892ee37 --- /dev/null +++ b/dags/example_dag_extra_tasks.py @@ -0,0 +1,78 @@ +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from dbt_airflow.core.task_group import DbtTaskGroup +from dbt_airflow.core.task import ExtraTask + + +@dag(schedule=None, tags=['example'], start_date=days_ago(1)) +def tutorial_dbt_airflow_extra_tasks(): + """ + This is an example Airflow DAG that represents a fundamental ELT pipeline where the + transformation part is performed with `dbt-airflow`. Furthermore, additional Airflow tasks are + also added in between specific dbt tasks. + + The creation of sub task-groups is disabled such that all Airflow Tasks (each of which + corresponds to a single dbt resource type), will be under the main DbtTaskGroup named + `my-dbt-project`. + """ + extract = DummyOperator(task_id='extract') + load = DummyOperator(task_id='load') + + extra_tasks = [ + ExtraTask( + task_id='test_task', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world'), + }, + upstream_task_ids={ + 'model.example_dbt_project.int_customers_per_store', + 'model.example_dbt_project.int_revenue_by_date' + } + ), + ExtraTask( + task_id='another_test_task', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world 2!'), + }, + upstream_task_ids={ + 'test.example_dbt_project.int_customers_per_store', + }, + downstream_task_ids={ + 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + } + ), + ExtraTask( + task_id='test_task_3', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world 3!'), + }, + downstream_task_ids={ + 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + }, + upstream_task_ids={ + 'model.example_dbt_project.int_revenue_by_date', + }, + ) + ] + + transform = DbtTaskGroup( + group_id='my-dbt-project', + dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'), + dbt_target='dev', + dbt_project_path=Path('/path/to/dbt/project/'), + dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'), + create_sub_task_groups=False, + extra_tasks=extra_tasks, + ) + + extract >> load >> transform + + +dag = tutorial_dbt_airflow_extra_tasks() diff --git a/dags/example_dag_sub_task_groups.py b/dags/example_dag_sub_task_groups.py new file mode 100644 index 0000000..0d5d45a --- /dev/null +++ b/dags/example_dag_sub_task_groups.py @@ -0,0 +1,28 @@ +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.dummy import DummyOperator +from dbt_airflow.core.task_group import DbtTaskGroup + + +@dag(schedule=None, tags=['example']) +def tutorial_dbt_airflow_basic(): + """ + This is an example Airflow DAG that represents a fundamental ELT pipeline where the + transformation part is performed with `dbt-airflow` + """ + extract = DummyOperator(task_id='extract') + load = DummyOperator(task_id='load') + transform = DbtTaskGroup( + group_id='my-dbt-project', + dbt_manifest_path=Path('example_dbt_project/target/manifest.json'), + dbt_target='dev', + dbt_project_path=Path('example_dbt_project/'), + dbt_profile_path=Path('example_dbt_project/profiles'), + create_sub_task_groups=True, + ) + + extract >> load >> transform + + +dag = tutorial_dbt_airflow_basic() \ No newline at end of file diff --git a/dags/test_dag.py b/dags/test_dag.py index 172d424..105f40d 100644 --- a/dags/test_dag.py +++ b/dags/test_dag.py @@ -11,62 +11,64 @@ with DAG( dag_id='test_dag', - start_date=datetime(2021, 1, 1), + start_date=datetime(2023, 5, 16), catchup=False, tags=['example'], + schedule_interval='0 6 * * *', ) as dag: - - extra_tasks = [ - ExtraTask( - task_id='test_task', - operator=PythonOperator, - operator_args={ - 'python_callable': lambda: print('Hello world'), - }, - upstream_task_ids={ - 'model.example_dbt_project.int_customers_per_store', - 'model.example_dbt_project.int_revenue_by_date' - } - ), - ExtraTask( - task_id='another_test_task', - operator=PythonOperator, - operator_args={ - 'python_callable': lambda: print('Hello world 2!'), - }, - upstream_task_ids={ - 'test.example_dbt_project.int_customers_per_store', - }, - downstream_task_ids={ - 'snapshot.example_dbt_project.int_customers_per_store_snapshot', - } - ), - ExtraTask( - task_id='test_task_3', - operator=PythonOperator, - operator_args={ - 'python_callable': lambda: print('Hello world 3!'), - }, - downstream_task_ids={ - 'snapshot.example_dbt_project.int_customers_per_store_snapshot', - }, - upstream_task_ids={ - 'model.example_dbt_project.int_revenue_by_date', - }, - ) - ] - t1 = DummyOperator(task_id='dummy_1') - t2 = DummyOperator(task_id='dummy_2') - - tg = DbtTaskGroup( - group_id='dbt-company', - dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'), - dbt_target='dev', - dbt_project_path=Path('/opt/airflow/example_dbt_project/'), - dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'), - extra_tasks=extra_tasks, - create_sub_task_groups=True, - ) - t1 >> tg >> t2 + # extra_tasks = [ + # ExtraTask( + # task_id='test_task', + # operator=PythonOperator, + # operator_args={ + # 'python_callable': lambda: print('Hello world'), + # }, + # upstream_task_ids={ + # 'model.example_dbt_project.int_customers_per_store', + # 'model.example_dbt_project.int_revenue_by_date' + # } + # ), + # ExtraTask( + # task_id='another_test_task', + # operator=PythonOperator, + # operator_args={ + # 'python_callable': lambda: print('Hello world 2!'), + # }, + # upstream_task_ids={ + # 'test.example_dbt_project.int_customers_per_store', + # }, + # downstream_task_ids={ + # 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + # } + # ), + # ExtraTask( + # task_id='test_task_3', + # operator=PythonOperator, + # operator_args={ + # 'python_callable': lambda: print('Hello world 3!'), + # }, + # downstream_task_ids={ + # 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + # }, + # upstream_task_ids={ + # 'model.example_dbt_project.int_revenue_by_date', + # }, + # ) + # ] + # + # t1 = DummyOperator(task_id='dummy_1') + # t2 = DummyOperator(task_id='dummy_2') + # + # tg = DbtTaskGroup( + # group_id='dbt-company', + # dbt_manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'), + # dbt_target='dev', + # dbt_project_path=Path('/opt/airflow/example_dbt_project/'), + # dbt_profile_path=Path('/opt/airflow/example_dbt_project/profiles'), + # extra_tasks=extra_tasks, + # create_sub_task_groups=True, + # ) + # + # t1 >> tg >> t2 diff --git a/docs/blob/examples/example_dag_basic/detailed_view.png b/docs/blob/examples/example_dag_basic/detailed_view.png new file mode 100644 index 0000000..ce0c320 Binary files /dev/null and b/docs/blob/examples/example_dag_basic/detailed_view.png differ diff --git a/docs/blob/examples/example_dag_basic/generic_view.png b/docs/blob/examples/example_dag_basic/generic_view.png new file mode 100644 index 0000000..66cdc86 Binary files /dev/null and b/docs/blob/examples/example_dag_basic/generic_view.png differ diff --git a/docs/blob/examples/example_dag_extra_tasks/detailed_view.png b/docs/blob/examples/example_dag_extra_tasks/detailed_view.png new file mode 100644 index 0000000..f717499 Binary files /dev/null and b/docs/blob/examples/example_dag_extra_tasks/detailed_view.png differ diff --git a/docs/blob/examples/example_dag_extra_tasks/generic_view.png b/docs/blob/examples/example_dag_extra_tasks/generic_view.png new file mode 100644 index 0000000..66cdc86 Binary files /dev/null and b/docs/blob/examples/example_dag_extra_tasks/generic_view.png differ diff --git a/docs/examples.md b/docs/examples.md deleted file mode 100644 index bf8247e..0000000 --- a/docs/examples.md +++ /dev/null @@ -1,36 +0,0 @@ -# Examples - -## Populating a dbt project on Airflow, as a DAG -TODO: Write description - -```python -from datetime import datetime -from pathlib import Path - -from airflow import DAG -from airflow.operators.dummy import DummyOperator -from dbt_airflow.core.task_group import DbtTaskGroup - - -with DAG( - dag_id='test_dag', - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - - t1 = DummyOperator(task_id='dummy_1') - t2 = DummyOperator(task_id='dummy_2') - tg = DbtTaskGroup( - group_id='dbt-company', - dbt_manifest_path=Path('/path/to/target/manifest.json'), - dbt_target='dev', - dbt_project_path=Path('/path/to/dbt/project/dir'), - dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'), - ) - - t1 >> tg >> t2 -``` - -## Airflow DAG with dbt project and additional dependencies -TODO diff --git a/docs/examples/example_dag_basic.md b/docs/examples/example_dag_basic.md new file mode 100644 index 0000000..6bfd0a0 --- /dev/null +++ b/docs/examples/example_dag_basic.md @@ -0,0 +1,59 @@ +# Example DAGs: Basic +This is an example DAG that will create two (dummy) tasks namely `extract` and `load`, +followed by a `DbtTaskGroup` that renders a dbt project that: +- Is located in `/path/to/dbt/project/` +- Whose compiled manifest JSON file is found under `/path/to/dbt/project/target/manifest.json` +- Uses the profile specified (in `dbt_project.yml`) in `profiles.yml` file, that is located in `/path/to/dbt/project/profiles/dir`. +- With `dev` target (that must be present in profile definition) + +The `DbtTaskGroup` has disabled `create_sub_task_groups`, meaning that all individual dbt resources will be rendered +at the same level, under the `DbtTaskGroup` Airflow Task Group. + + +## Code +```python +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.dummy import DummyOperator +from airflow.utils.dates import days_ago +from dbt_airflow.core.task_group import DbtTaskGroup + + +@dag(schedule=None, tags=['example'], start_date=days_ago(1)) +def tutorial_dbt_airflow_basic(): + """ + This is an example Airflow DAG that represents a fundamental ELT pipeline where the + transformation part is performed with `dbt-airflow`. + + The creation of sub task-groups is disabled such that all Airflow Tasks (each of which + corresponds to a single dbt resource type), will be under the main DbtTaskGroup named + `my-dbt-project`. + """ + extract = DummyOperator(task_id='extract') + load = DummyOperator(task_id='load') + transform = DbtTaskGroup( + group_id='my-dbt-project', + dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'), + dbt_target='dev', + dbt_project_path=Path('/path/to/dbt/project/'), + dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'), + create_sub_task_groups=False, + ) + + extract >> load >> transform + + +dag = tutorial_dbt_airflow_basic() +``` + +## How the DAG is rendered on Airflow + +This is how the DAG will initially look like on Apache Airflow: + +test + +When you click on the `my-dbt-project` TaskGroup, you will then be able to see all the dbt resources rendered as +individual Airflow Tasks. + +test diff --git a/docs/examples/example_dag_extra_tasks.md b/docs/examples/example_dag_extra_tasks.md new file mode 100644 index 0000000..4c7c981 --- /dev/null +++ b/docs/examples/example_dag_extra_tasks.md @@ -0,0 +1,119 @@ +# Example DAGs: Extra Tasks + +This is an example DAG that will create two (dummy) tasks namely `extract` and `load`, +followed by a `DbtTaskGroup` that renders a dbt project that: +- Is located in `/path/to/dbt/project/` +- Whose compiled manifest JSON file is found under `/path/to/dbt/project/target/manifest.json` +- Uses the profile specified (in `dbt_project.yml`) in `profiles.yml` file, that is located in `/path/to/dbt/project/profiles/dir`. +- With `dev` target (that must be present in profile definition) + +The `DbtTaskGroup` has disabled `create_sub_task_groups`, meaning that all individual dbt resources will be rendered +at the same level, under the `DbtTaskGroup` Airflow Task Group. + +## Extra Tasks +Additionally, we specify extra tasks by taking advantage of the `ExtraTask` class that can be used to insert +tasks in the dbt task dependency graph. + +More specifically, we specify three extra tasks: +- `test_task` will run a `PythonOperator` and the task itself will have two upstream dbt tasks, +namely `model.example_dbt_project.int_customers_per_store` and `model.example_dbt_project.int_revenue_by_date`. This +the extra task will run once these two upstream dependencies are completed +- `another_test_task` is another `PythonOperator` that will be placed in between tasks +`test.example_dbt_project.int_customers_per_store` and `snapshot.example_dbt_project.int_customers_per_store_snapshot` +- `test_task_3` is also placed in between two tasks, namely `model.example_dbt_project.int_revenue_by_date` +and `snapshot.example_dbt_project.int_customers_per_store_snapshot`. + + +## Code +```python +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from dbt_airflow.core.task_group import DbtTaskGroup +from dbt_airflow.core.task import ExtraTask + + +@dag(schedule=None, tags=['example'], start_date=days_ago(1)) +def tutorial_dbt_airflow_extra_tasks(): + """ + This is an example Airflow DAG that represents a fundamental ELT pipeline where the + transformation part is performed with `dbt-airflow`. Furthermore, additional Airflow tasks are + also added in between specific dbt tasks. + + The creation of sub task-groups is disabled such that all Airflow Tasks (each of which + corresponds to a single dbt resource type), will be under the main DbtTaskGroup named + `my-dbt-project`. + """ + extract = DummyOperator(task_id='extract') + load = DummyOperator(task_id='load') + + extra_tasks = [ + ExtraTask( + task_id='test_task', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world'), + }, + upstream_task_ids={ + 'model.example_dbt_project.int_customers_per_store', + 'model.example_dbt_project.int_revenue_by_date' + } + ), + ExtraTask( + task_id='another_test_task', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world 2!'), + }, + upstream_task_ids={ + 'test.example_dbt_project.int_customers_per_store', + }, + downstream_task_ids={ + 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + } + ), + ExtraTask( + task_id='test_task_3', + operator=PythonOperator, + operator_args={ + 'python_callable': lambda: print('Hello world 3!'), + }, + downstream_task_ids={ + 'snapshot.example_dbt_project.int_customers_per_store_snapshot', + }, + upstream_task_ids={ + 'model.example_dbt_project.int_revenue_by_date', + }, + ) + ] + + transform = DbtTaskGroup( + group_id='my-dbt-project', + dbt_manifest_path=Path('/path/to/dbt/project/target/manifest.json'), + dbt_target='dev', + dbt_project_path=Path('/path/to/dbt/project/'), + dbt_profile_path=Path('/path/to/dbt/project/profiles/dir'), + create_sub_task_groups=False, + extra_tasks=extra_tasks, + ) + + extract >> load >> transform + + +dag = tutorial_dbt_airflow_extra_tasks() +``` + +## How the DAG is rendered on Airflow + +This is how the DAG will initially look like on Apache Airflow: + +test + +When you click on the `my-dbt-project` TaskGroup, you will then be able to see all the dbt resources rendered as +individual Airflow Tasks. + +test + diff --git a/docs/examples/example_dag_sub_task_groups.md b/docs/examples/example_dag_sub_task_groups.md new file mode 100644 index 0000000..fff4c54 --- /dev/null +++ b/docs/examples/example_dag_sub_task_groups.md @@ -0,0 +1,8 @@ +# Example DAGs: Task Groups + + + +## Code +```python + +``` diff --git a/mkdocs.yml b/mkdocs.yml index 07e4a52..24c5558 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -11,6 +11,10 @@ repo_url: https://github.com/gmyrianthous/dbt-airflow nav: - Overview: index.md + - Examples: + - Basic DAG: examples/example_dag_basic.md + - DAG with Extra Tasks: examples/example_dag_extra_tasks.md + - DAG without task groups: examples/example_dag_sub_task_groups.md - Contributing to dbt-airflow: contributing.md plugins: