Skip to content

Commit

Permalink
Merge pull request #16 from kevinsunny1996/task/set_schedule_and_add_…
Browse files Browse the repository at this point in the history
…hibernate_safe_skip

Updated image version and added hibernation check and removed schedul…
  • Loading branch information
kevinsunny1996 authored May 9, 2024
2 parents 994c0f5 + 4fb7143 commit 0754985
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM quay.io/astronomer/astro-runtime:10.8.0
FROM quay.io/astronomer/astro-runtime:11.3.0
ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled
18 changes: 14 additions & 4 deletions dags/rawg_api_extractor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

from airflow.exceptions import AirflowSkipException

from airflow.utils.dates import days_ago
from airflow.models import Variable

Expand All @@ -14,7 +16,6 @@
from utils.gcp_utils import get_gcp_connection_and_upload_to_gcs



# Default DAG arguments
default_args = {
'owner': 'airflow',
Expand Down Expand Up @@ -313,8 +314,8 @@
dag_id='rawg_api_extractor_dag',
default_args=default_args,
description='DAG to fetch RAWG API data from games/ endpoint, convert the JSON to CSV and upload to GCS and then load it in Bigquery',
schedule=None,
schedule_interval='*/6 * * * *',
# schedule=None,
schedule_interval='*/3 * * * *',
start_date=datetime(2023, 9, 1),
tags=['rawg_api_elt'],
catchup=False
Expand All @@ -328,6 +329,14 @@ def rawg_api_extractor_dag():
rawg_api_bq_dataset = Variable.get('gcp_bq_dataset')
gcp_project_name = Variable.get('gcp_project_id')

# Check hibernation is close or not
@task
def check_hibernation(**context):
hibernation_start = datetime.strptime('10:52:00', '%H:%M:%S').time()
now = datetime.now().time()
if now >= hibernation_start:
raise AirflowSkipException('Skipping DAG run close to hibernation time')

# Task to get Game IDs to fetch data from in subsequent task
@task
def get_rawg_api_game_ids(api_key: str, page_number: int) -> []:
Expand Down Expand Up @@ -469,12 +478,13 @@ def update_page_number(rawg_page_number: int) -> int:
Variable.set("api_page_number", next_page_number)

# DAG Flow
hibernation_check = check_hibernation()
game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number)
game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number)
clear_extracted_parquet_files = remove_extracted_api_parquet_files(rawg_landing_gcs_bucket)
next_page_number = update_page_number(rawg_page_number)

game_ids_list >> game_details_extractor >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_parquet_files >> next_page_number
hibernation_check >> game_ids_list >> game_details_extractor >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_parquet_files >> next_page_number

rawg_api_extractor_dag()

0 comments on commit 0754985

Please sign in to comment.