Skip to content

Commit

Permalink
Merge pull request #18 from kevinsunny1996/task/add_load_date_and_upd…
Browse files Browse the repository at this point in the history
…ate_params_and_add_transforms

Task/add load date and update params and add transforms
  • Loading branch information
kevinsunny1996 authored May 11, 2024
2 parents d2180b5 + 6c60d39 commit 656db19
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
FROM quay.io/astronomer/astro-runtime:11.3.0
ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled
ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled

# install dbt into a virtual environment
RUN python -m venv dbt_venv && source dbt_venv/bin/activate && \
pip install --no-cache-dir dbt-bigquery && deactivate
37 changes: 37 additions & 0 deletions dags/rawg_api_extractor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "load_date",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

Expand Down Expand Up @@ -147,6 +154,13 @@
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "load_date",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

Expand Down Expand Up @@ -221,6 +235,13 @@
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "load_date",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

Expand Down Expand Up @@ -260,6 +281,13 @@
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "load_date",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

Expand Down Expand Up @@ -306,6 +334,13 @@
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "load_date",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

Expand All @@ -319,6 +354,8 @@
# schedule_interval='*/3 * * * *',
# To avoid DAG from triggering when it wakes up from hibernation at 8 UTC
start_date=datetime(2023, 9, 1, 8, 2),
# Makes sure only run is active at a point of time to avoid overlapping runs
max_active_runs=1,
tags=['rawg_api_elt'],
catchup=False
)
Expand Down
17 changes: 14 additions & 3 deletions dags/utils/rawg_api_caller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import pandas as pd
from datetime import datetime

# Logger factory class import
from utils.logger import LoggerFactory
Expand Down Expand Up @@ -66,9 +67,7 @@ def get_unique_ids_per_endpoint(self, api_key: str, page_number: int) -> []:
endpoint_params = {
'page_size': '10000',
'page': page_number,
'ordering': 'released',
'parent_platforms':'1,2,3,4,5,6,7,8,9,10',
'dates':'1990-01-01,2023-12-12'
'metacritic': '75'
}

endpoint_response = requests.get(generate_api_url, params=endpoint_params)
Expand Down Expand Up @@ -161,6 +160,13 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number:
publisher_df_flattened = pd.json_normalize(games_json, sep='_', record_path=['publishers'], meta=['id'], meta_prefix='game_')
publisher_df = pd.concat([publisher_df, publisher_df_flattened], ignore_index=True)

# Add load_date column for all dataframes
games_df['load_date'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
genre_df['load_date'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
platforms_df['load_date'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
publisher_df['load_date'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
ratings_df['load_date'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')

# Enforcing datatypes for columns of games dataframe
games_df['id'] = games_df['id'].astype(int)
games_df['released'] = pd.to_datetime(games_df['released'], format='%Y-%m-%d')
Expand All @@ -170,11 +176,13 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number:
games_df['rating_top'] = games_df['rating_top'].astype(int)
games_df['playtime'] = games_df['playtime'].astype(int)
games_df['metacritic'] = games_df['metacritic'].astype(str)
games_df['load_date'] = games_df['load_date'].astype(str)

# Enforcing datatypes for columns of genres dataframe
genre_df['id'] = genre_df['id'].astype(int)
genre_df['games_count'] = genre_df['games_count'].astype(int)
genre_df['game_id'] = genre_df['game_id'].astype(int)
genre_df['load_date'] = genre_df['load_date'].astype(str)

# Enforcing datatypes for columns of platforms dataframe
platforms_df['released_at'] = pd.to_datetime(platforms_df['released_at'], format='%Y-%m-%d')
Expand All @@ -183,17 +191,20 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number:
platforms_df['platform_games_count'] = platforms_df['platform_games_count'].astype(int)
platforms_df['game_id'] = platforms_df['game_id'].astype(int)
platforms_df['platform_year_start'] = platforms_df['platform_year_start'].astype(str)
platforms_df['load_date'] = platforms_df['load_date'].astype(str)

# Enforcing datatypes for columns of publisher dataframe
publisher_df['id'] = publisher_df['id'].astype(int)
publisher_df['games_count'] = publisher_df['games_count'].astype(int)
publisher_df['game_id'] = publisher_df['game_id'].astype(int)
publisher_df['load_date'] = publisher_df['load_date'].astype(str)

# Enforcing datatypes for columns of ratings dataframe
ratings_df['id'] = ratings_df['id'].astype(int)
ratings_df['count'] = ratings_df['count'].astype(int)
ratings_df['percent'] = ratings_df['percent'].astype(float)
ratings_df['game_id'] = ratings_df['game_id'].astype(int)
ratings_df['load_date'] = ratings_df['load_date'].astype(str)


# Log the dimensions of each flattened file for tracking
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ apache-airflow-providers-google==10.13.1
google-cloud-secret-manager==2.16.1
gcsfs==2024.2.0
google-cloud-storage==2.15.0
pyarrow==15.0.2
pyarrow==15.0.2
astronomer-cosmos==1.3.2

0 comments on commit 656db19

Please sign in to comment.