Skip to content

Commit

Permalink
Changed CSV to parquet to avoid data overflow to other columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinsunny1996 committed Apr 21, 2024
1 parent 853a793 commit 83ba72f
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 91 deletions.
309 changes: 224 additions & 85 deletions dags/rawg_api_extractor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
from airflow.decorators import dag, task, task_group
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.hooks.secret_manager import SecretsManagerHook
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator

from airflow.utils.dates import days_ago
from airflow.models import Variable

Expand All @@ -23,65 +22,140 @@
'retry_delay': timedelta(minutes=5),
}

# Instance of Schema to be passed for creating empty tables to be later loaded to
table_schemas = {
'ratings': [
{'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Ratings ID corresponding to the rating given to a game'},
{'name': 'title','type': 'STRING','mode': 'NULLABLE','description': 'Rating Type Corresponding to the respective Rating ID'},
{'name': 'count','type': 'INTEGER','mode': 'NULLABLE','description': 'Number of games with the given rating , eg- Recommended for 30 games etc.'},
{'name': 'percent','type': 'FLOAT','mode': 'NULLABLE','description': 'Percentage of games with the given rating'},
{'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description':'Foreign key to map to gaming product table game ID'}
],
'games': [
{'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game'},
{'name': 'slug','type': 'STRING','mode': 'NULLABLE','description': 'Slug version of the game name , eg- resident-evil-4'},
{'name': 'name_original','type': 'STRING','mode': 'NULLABLE','description': 'Actual official name of the game'},
{'name': 'description_raw','type': 'STRING','mode': 'NULLABLE','description': 'Game Description'},
{'name': 'metacritic','type': 'FLOAT','mode':'REQUIRED','description': 'Metacritic rating of the game'},
{'name': 'released','type': 'DATE','mode':'REQUIRED','description':'Release date of the game in format YYYY-MM-DD'},
{'name': 'tba','type': 'BOOLEAN','mode': 'REQUIRED','description': 'Is the game yet to be announced?'},
{'name': 'updated','type': 'DATETIME','mode': 'REQUIRED','description': 'Time and date when the data was last updated'},
{'name': 'rating','type': 'FLOAT','mode': 'REQUIRED','description': 'Rating of the game from 1 to 5'},
{'name': 'rating_top','type': 'NUMERIC','mode': 'REQUIRED','description': 'Max Average Rating given to that game, relates to id of ratings table'},
{'name': 'playtime','type': 'FLOAT','mode': 'REQUIRED','description': 'Playtime for the game in minutes'}
],
'genres': [
{'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Genre ID for the given game'},
{'name': 'name','type': 'STRING','mode': 'NULLABLE','description': 'Name of the genre , eg- Adventure, Action etc.'},
{'name': 'slug','type': 'STRING','mode': 'NULLABLE','description': 'Lower case name of the genre , eg- adventure, action etc.'},
{'name': 'games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that genre'},
{'name': 'image_background','type': 'STRING','mode': 'REQUIRED','description': 'Image background URL'},
{'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID , foreign key of Games table'}
],
'platforms': [
{'name': 'released_at','type': 'DATE','mode': 'NULLABLE','description': 'Release date of the game on the respective platform in format YYYY-MM-DD'},
{'name': 'platform_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Platform ID for the given platform'},
{'name': 'platform_name','type': 'STRING','mode': 'REQUIRED','description': 'Platform Name'},
{'name': 'platform_slug','type': 'STRING','mode': 'REQUIRED','description': 'Lower case platform name'},
{'name': 'platform_image','type': 'STRING','mode': 'NULLABLE','description': 'Platform Image URL'},
{'name': 'platform_year_end','type': 'FLOAT','mode': 'NULLABLE','description': 'End Year for the given platform'},
{'name': 'platform_year_start','type': 'FLOAT','mode': 'NULLABLE','description': 'Start Year for the given platform'},
{'name': 'platform_games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that platform'},
{'name': 'platform_image_background','type': 'STRING','mode': 'NULLABLE','description': 'Platform image background URL'},
{'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game used as foreign key in games table'}
],
"publishers": [
{'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Publisher ID for the given game'},
{'name': 'name','type': 'STRING','mode': 'REQUIRED','description': 'Name of the publisher'},
{'name': 'slug','type': 'STRING','mode': 'REQUIRED','description': 'Lower case name of the publisher'},
{'name': 'games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that publisher'},
{'name': 'image_background','type': 'STRING','mode': 'NULLABLE','description': 'Image background URL'},
{'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game used as foreign key in games table'}
]
}
# Schema for games table
schema_games = [
{
"name": "id",
"mode": "NULLABLE",
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "slug",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "name_original",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "description_raw",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "released",
"mode": "NULLABLE",
"type": "DATE",
"description": "",
"fields": []
},
{
"name": "tba",
"mode": "NULLABLE",
"type": "BOOLEAN",
"description": "",
"fields": []
},
{
"name": "updated",
"mode": "NULLABLE",
"type": "TIMESTAMP",
"description": "",
"fields": []
},
{
"name": "rating",
"mode": "NULLABLE",
"type": "FLOAT",
"description": "",
"fields": []
},
{
"name": "rating_top",
"mode": "NULLABLE",
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "playtime",
"mode": "NULLABLE",
"type": "INTEGER",
"description": "",
"fields": []
},
{
"name": "metacritic",
"mode": "",
"type": "STRING",
"description": "",
"fields": []
}
]

# DAG definition
# Schema for publishers table
schema_publishers = [
{
"name": "id",
"mode": "NULLABLE",
"type": "FLOAT",
"description": "",
"fields": []
},
{
"name": "name",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "slug",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "games_count",
"mode": "NULLABLE",
"type": "FLOAT",
"description": "",
"fields": []
},
{
"name": "image_background",
"mode": "NULLABLE",
"type": "STRING",
"description": "",
"fields": []
},
{
"name": "game_id",
"mode": "NULLABLE",
"type": "INTEGER",
"description": "",
"fields": []
}
]

# DAG definition to Extract and Load data obtained from RAWG API to Bigquery with current schedule of running every 6 minutes
@dag(
dag_id='rawg_api_extractor_dag',
default_args=default_args,
description='DAG to fetch RAWG API data from games/ endpoint, convert the JSON to CSV and upload to GCS and then load it in Bigquery',
schedule=None,
schedule_interval=None,
schedule_interval='*/6 * * * *',
start_date=datetime(2023, 9, 1),
tags=['rawg_api_elt'],
catchup=False
Expand Down Expand Up @@ -130,43 +204,108 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int

# Save the files as CSV directly to GCS , post creating GCS bucket variable
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, games_df, 'games', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number)
# get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number)
# get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number)
# get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number)
# get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number)

# Create Empty Table for Schema defined for the 5 tables created as output
@task
def create_rawg_api_placeholder_empty_table(table_name: str, schema: list) -> None:
create_empty_table_task = BigQueryCreateEmptyTableOperator(
task_id=f'create_empty_table_for_{table_name}',
dataset_id=rawg_api_bq_dataset, # Set your BigQuery dataset ID
table_id=table_name,
project_id=gcp_project_name, # Set your BigQuery project ID
schema_fields=schema,
gcp_conn_id='gcp', # Set your GCP connection ID
if_exists='skip' # Skip creating the table if it already exists
)
create_empty_table_task.execute(context={})

@task_group
def placeholder_empty_table_task_group():
for table_name, schema in table_schemas.items():
create_rawg_api_placeholder_empty_table(table_name, schema)

@task
def update_page_number(rawg_page_number: int) -> int:
# Update page number to fetch from the consecutive one in the next run
next_page_number = int(rawg_page_number) + 1
Variable.set("api_page_number", next_page_number)
# Load contents from GCS onto BigQuery for that run
# load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator(
# task_id=f'load_ratings_to_bq',
# bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from.
# source_objects=[f'ratings_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS
# source_format='csv',
# destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to.
# gcp_conn_id='gcp', # Set your GCP connection ID.
# create_disposition='CREATE_IF_NEEDED',
# write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table.
# skip_leading_rows=1 # Skip the header row in the CSV file.
# )

load_rawg_api_games_data_to_bq = GCSToBigQueryOperator(
task_id=f'load_games_to_bq',
bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from.
source_objects=[f'games_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS
source_format='PARQUET',
allow_quoted_newlines=True,
ignore_unknown_values=True,
# max_bad_records=40,
destination_project_dataset_table=f'{rawg_api_bq_dataset}.games', # Set your BigQuery table name to load the data to.
gcp_conn_id='gcp', # Set your GCP connection ID.
create_disposition='CREATE_IF_NEEDED',
schema_fields=schema_games,
autodetect=False,
write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table.
skip_leading_rows=1 # Skip the header row in the CSV file.
)

# load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator(
# task_id=f'load_genres_to_bq',
# bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from.
# source_objects=[f'genres_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS
# source_format='csv',
# destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to.
# gcp_conn_id='gcp', # Set your GCP connection ID.
# create_disposition='CREATE_IF_NEEDED',
# write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table.
# skip_leading_rows=1 # Skip the header row in the CSV file.
# )

# load_rawg_api_platforms_data_to_bq = GCSToBigQueryOperator(
# task_id=f'load_platforms_to_bq',
# bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from.
# source_objects=[f'platforms_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS
# source_format='csv',
# destination_project_dataset_table=f'{rawg_api_bq_dataset}.platforms', # Set your BigQuery table name to load the data to.
# gcp_conn_id='gcp', # Set your GCP connection ID.
# create_disposition='CREATE_IF_NEEDED',
# write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table.
# skip_leading_rows=1 # Skip the header row in the CSV file.
# )

# load_rawg_api_publishers_data_to_bq = GCSToBigQueryOperator(
# task_id=f'load_publishers_to_bq',
# bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from.
# source_objects=[f'publishers_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS
# source_format='csv',
# destination_project_dataset_table=f'{rawg_api_bq_dataset}.publishers', # Set your BigQuery table name to load the data to.
# gcp_conn_id='gcp', # Set your GCP connection ID.
# create_disposition='CREATE_IF_NEEDED',
# write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table.
# skip_leading_rows=1, # Skip the header row in the CSV file.
# autodetect=False,
# schema_fields=schema_publishers,
# allow_quoted_newlines=True,
# ignore_unknown_values=True,
# max_bad_records=40
# )

# @task
# def remove_extracted_api_csv(bucket_name: str) -> None:
# rawg_api_gcs_hook = GCSHook(gcp_conn_id='gcp')

# # Get the files to delete
# api_csv_files = rawg_api_gcs_hook.list(bucket_name)

# # Delete the files
# for api_file in api_csv_files:
# rawg_api_gcs_hook.delete(bucket_name, api_file)

# @task
# def update_page_number(rawg_page_number: int) -> int:
# # Update page number to fetch from the consecutive one in the next run
# next_page_number = int(rawg_page_number) + 1
# Variable.set("api_page_number", next_page_number)

# DAG Flow
game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number)
game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number)
placeholder_empty_table_tasks = placeholder_empty_table_task_group()
next_page_number = update_page_number(rawg_page_number)
# clear_extracted_csv_files = remove_extracted_api_csv(rawg_landing_gcs_bucket)
# next_page_number = update_page_number(rawg_page_number)

game_ids_list >> game_details_extractor >> placeholder_empty_table_tasks >> next_page_number
game_ids_list >> game_details_extractor >> load_rawg_api_games_data_to_bq
# >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_csv_files >> next_page_number


rawg_api_extractor_dag()
Expand Down
Loading

0 comments on commit 83ba72f

Please sign in to comment.