Skip to content

Commit

Permalink
Merge pull request #19 from kevinsunny1996/task/add_id_check_and_crea…
Browse files Browse the repository at this point in the history
…te_cosmos_profile

Categorize DAG steps using taskgroup and added empty response check
  • Loading branch information
kevinsunny1996 authored May 13, 2024
2 parents 656db19 + 7766330 commit db2438c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
35 changes: 27 additions & 8 deletions dags/rawg_api_extractor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

# Cosmos an open source Astro library to run DBT jobs
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig

# Airflow Exceptions class to skip Airflow tasks when a condition is met
from airflow.exceptions import AirflowSkipException

from airflow.utils.dates import days_ago
Expand Down Expand Up @@ -361,6 +365,8 @@
)
def rawg_api_extractor_dag():

@task_group(group_id='initialize_vars_and_check_hibernation')
def initialize_vars_and_check_hibernation():
# Fetch variable values for API Key and Page Number and Landing bucket URL
rawg_api_key = Variable.get('rawg_api_key')
rawg_page_number = int(Variable.get('api_page_number', default_var=1))
Expand All @@ -376,6 +382,11 @@ def check_hibernation(**context):
if now >= hibernation_start:
raise AirflowSkipException('Skipping DAG run close to hibernation time')

# Check hibernation task callable
check_hibernation()

@task_group(group_id='extract_rawg_api_data')
def extract_rawg_api_data():
# Task to get Game IDs to fetch data from in subsequent task
@task
def get_rawg_api_game_ids(api_key: str, page_number: int) -> []:
Expand Down Expand Up @@ -416,7 +427,12 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number)


# Call the above extraction tasks in the group
get_rawg_api_game_ids(rawg_api_key, rawg_page_number)
get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number)

@task_group(group_id='load_extracted_data_to_bq')
def load_extracted_data_to_bq():
# Load contents from GCS onto BigQuery for that run
load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator(
task_id=f'load_ratings_to_bq',
Expand Down Expand Up @@ -499,6 +515,12 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int
max_bad_records=40
)

# DAG Flow between load steps
load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq


@task_group(group_id='post_load_cleanup')
def post_load_cleanup():
@task
def remove_extracted_api_parquet_files(bucket_name: str) -> None:
rawg_api_gcs_hook = GCSHook(gcp_conn_id='gcp')
Expand All @@ -516,14 +538,11 @@ def update_page_number(rawg_page_number: int) -> int:
next_page_number = int(rawg_page_number) + 1
Variable.set("api_page_number", next_page_number)

# DAG Flow
hibernation_check = check_hibernation()
game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number)
game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number)
clear_extracted_parquet_files = remove_extracted_api_parquet_files(rawg_landing_gcs_bucket)
next_page_number = update_page_number(rawg_page_number)
remove_extracted_api_parquet_files(rawg_landing_gcs_bucket)
update_page_number(rawg_page_number)

hibernation_check >> game_ids_list >> game_details_extractor >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_parquet_files >> next_page_number
# DAG Flow
initialize_vars_and_check_hibernation() >> extract_rawg_api_data() >> load_extracted_data_to_bq() >> post_load_cleanup()

rawg_api_extractor_dag()

3 changes: 2 additions & 1 deletion dags/utils/rawg_api_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number:
response = requests.request("GET", f"{base_url}/{endpoint}/{game_id}?key={api_key}", headers=headers, data=payload)
info_logger.info(f'Fetched game related data for {game_id} for {endpoint} endpoint')

if response.status_code == 200:
# If the API call is successful and returns response, then only do further processing
if response.status_code == 200 and response['id'] == game_id:
# Convert to JSON and flatten the response
games_json = response.json()

Expand Down

0 comments on commit db2438c

Please sign in to comment.