diff --git a/dags/rawg_api_extractor_dag.py b/dags/rawg_api_extractor_dag.py index c69884a..9eb78fb 100644 --- a/dags/rawg_api_extractor_dag.py +++ b/dags/rawg_api_extractor_dag.py @@ -3,9 +3,8 @@ from airflow.decorators import dag, task, task_group from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook -from airflow.providers.google.cloud.hooks.secret_manager import SecretsManagerHook from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator -from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator + from airflow.utils.dates import days_ago from airflow.models import Variable @@ -23,65 +22,140 @@ 'retry_delay': timedelta(minutes=5), } -# Instance of Schema to be passed for creating empty tables to be later loaded to -table_schemas = { - 'ratings': [ - {'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Ratings ID corresponding to the rating given to a game'}, - {'name': 'title','type': 'STRING','mode': 'NULLABLE','description': 'Rating Type Corresponding to the respective Rating ID'}, - {'name': 'count','type': 'INTEGER','mode': 'NULLABLE','description': 'Number of games with the given rating , eg- Recommended for 30 games etc.'}, - {'name': 'percent','type': 'FLOAT','mode': 'NULLABLE','description': 'Percentage of games with the given rating'}, - {'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description':'Foreign key to map to gaming product table game ID'} - ], - 'games': [ - {'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game'}, - {'name': 'slug','type': 'STRING','mode': 'NULLABLE','description': 'Slug version of the game name , eg- resident-evil-4'}, - {'name': 'name_original','type': 'STRING','mode': 'NULLABLE','description': 'Actual official name of the game'}, - {'name': 'description_raw','type': 'STRING','mode': 'NULLABLE','description': 'Game Description'}, - {'name': 'metacritic','type': 'FLOAT','mode':'REQUIRED','description': 'Metacritic rating of the game'}, - {'name': 'released','type': 'DATE','mode':'REQUIRED','description':'Release date of the game in format YYYY-MM-DD'}, - {'name': 'tba','type': 'BOOLEAN','mode': 'REQUIRED','description': 'Is the game yet to be announced?'}, - {'name': 'updated','type': 'DATETIME','mode': 'REQUIRED','description': 'Time and date when the data was last updated'}, - {'name': 'rating','type': 'FLOAT','mode': 'REQUIRED','description': 'Rating of the game from 1 to 5'}, - {'name': 'rating_top','type': 'NUMERIC','mode': 'REQUIRED','description': 'Max Average Rating given to that game, relates to id of ratings table'}, - {'name': 'playtime','type': 'FLOAT','mode': 'REQUIRED','description': 'Playtime for the game in minutes'} - ], - 'genres': [ - {'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Genre ID for the given game'}, - {'name': 'name','type': 'STRING','mode': 'NULLABLE','description': 'Name of the genre , eg- Adventure, Action etc.'}, - {'name': 'slug','type': 'STRING','mode': 'NULLABLE','description': 'Lower case name of the genre , eg- adventure, action etc.'}, - {'name': 'games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that genre'}, - {'name': 'image_background','type': 'STRING','mode': 'REQUIRED','description': 'Image background URL'}, - {'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID , foreign key of Games table'} - ], - 'platforms': [ - {'name': 'released_at','type': 'DATE','mode': 'NULLABLE','description': 'Release date of the game on the respective platform in format YYYY-MM-DD'}, - {'name': 'platform_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Platform ID for the given platform'}, - {'name': 'platform_name','type': 'STRING','mode': 'REQUIRED','description': 'Platform Name'}, - {'name': 'platform_slug','type': 'STRING','mode': 'REQUIRED','description': 'Lower case platform name'}, - {'name': 'platform_image','type': 'STRING','mode': 'NULLABLE','description': 'Platform Image URL'}, - {'name': 'platform_year_end','type': 'FLOAT','mode': 'NULLABLE','description': 'End Year for the given platform'}, - {'name': 'platform_year_start','type': 'FLOAT','mode': 'NULLABLE','description': 'Start Year for the given platform'}, - {'name': 'platform_games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that platform'}, - {'name': 'platform_image_background','type': 'STRING','mode': 'NULLABLE','description': 'Platform image background URL'}, - {'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game used as foreign key in games table'} - ], - "publishers": [ - {'name': 'id','type': 'INTEGER','mode': 'REQUIRED','description': 'Publisher ID for the given game'}, - {'name': 'name','type': 'STRING','mode': 'REQUIRED','description': 'Name of the publisher'}, - {'name': 'slug','type': 'STRING','mode': 'REQUIRED','description': 'Lower case name of the publisher'}, - {'name': 'games_count','type': 'INTEGER','mode': 'NULLABLE','description': 'Count of games for that publisher'}, - {'name': 'image_background','type': 'STRING','mode': 'NULLABLE','description': 'Image background URL'}, - {'name': 'game_id','type': 'INTEGER','mode': 'REQUIRED','description': 'Game ID for the given game used as foreign key in games table'} - ] -} +# Schema for games table +schema_games = [ + { + "name": "id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "slug", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "name_original", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "description_raw", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "released", + "mode": "NULLABLE", + "type": "DATE", + "description": "", + "fields": [] + }, + { + "name": "tba", + "mode": "NULLABLE", + "type": "BOOLEAN", + "description": "", + "fields": [] + }, + { + "name": "updated", + "mode": "NULLABLE", + "type": "TIMESTAMP", + "description": "", + "fields": [] + }, + { + "name": "rating", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "rating_top", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "playtime", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "metacritic", + "mode": "", + "type": "STRING", + "description": "", + "fields": [] + } +] -# DAG definition +# Schema for publishers table +schema_publishers = [ + { + "name": "id", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "name", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "slug", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "games_count", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "image_background", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "game_id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + } +] + +# DAG definition to Extract and Load data obtained from RAWG API to Bigquery with current schedule of running every 6 minutes @dag( dag_id='rawg_api_extractor_dag', default_args=default_args, description='DAG to fetch RAWG API data from games/ endpoint, convert the JSON to CSV and upload to GCS and then load it in Bigquery', schedule=None, - schedule_interval=None, + schedule_interval='*/6 * * * *', start_date=datetime(2023, 9, 1), tags=['rawg_api_elt'], catchup=False @@ -130,43 +204,108 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int # Save the files as CSV directly to GCS , post creating GCS bucket variable get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, games_df, 'games', rawg_page_number) - get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number) - get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number) - get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number) - get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number) + # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number) + # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number) + # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number) + # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number) - # Create Empty Table for Schema defined for the 5 tables created as output - @task - def create_rawg_api_placeholder_empty_table(table_name: str, schema: list) -> None: - create_empty_table_task = BigQueryCreateEmptyTableOperator( - task_id=f'create_empty_table_for_{table_name}', - dataset_id=rawg_api_bq_dataset, # Set your BigQuery dataset ID - table_id=table_name, - project_id=gcp_project_name, # Set your BigQuery project ID - schema_fields=schema, - gcp_conn_id='gcp', # Set your GCP connection ID - if_exists='skip' # Skip creating the table if it already exists - ) - create_empty_table_task.execute(context={}) - @task_group - def placeholder_empty_table_task_group(): - for table_name, schema in table_schemas.items(): - create_rawg_api_placeholder_empty_table(table_name, schema) - - @task - def update_page_number(rawg_page_number: int) -> int: - # Update page number to fetch from the consecutive one in the next run - next_page_number = int(rawg_page_number) + 1 - Variable.set("api_page_number", next_page_number) + # Load contents from GCS onto BigQuery for that run + # load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_ratings_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'ratings_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS + # source_format='csv', + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) + + load_rawg_api_games_data_to_bq = GCSToBigQueryOperator( + task_id=f'load_games_to_bq', + bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + source_objects=[f'games_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + source_format='PARQUET', + allow_quoted_newlines=True, + ignore_unknown_values=True, + # max_bad_records=40, + destination_project_dataset_table=f'{rawg_api_bq_dataset}.games', # Set your BigQuery table name to load the data to. + gcp_conn_id='gcp', # Set your GCP connection ID. + create_disposition='CREATE_IF_NEEDED', + schema_fields=schema_games, + autodetect=False, + write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + skip_leading_rows=1 # Skip the header row in the CSV file. + ) + + # load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_genres_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'genres_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS + # source_format='csv', + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) + + # load_rawg_api_platforms_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_platforms_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'platforms_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS + # source_format='csv', + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.platforms', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) + + # load_rawg_api_publishers_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_publishers_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'publishers_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS + # source_format='csv', + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.publishers', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1, # Skip the header row in the CSV file. + # autodetect=False, + # schema_fields=schema_publishers, + # allow_quoted_newlines=True, + # ignore_unknown_values=True, + # max_bad_records=40 + # ) + + # @task + # def remove_extracted_api_csv(bucket_name: str) -> None: + # rawg_api_gcs_hook = GCSHook(gcp_conn_id='gcp') + + # # Get the files to delete + # api_csv_files = rawg_api_gcs_hook.list(bucket_name) + + # # Delete the files + # for api_file in api_csv_files: + # rawg_api_gcs_hook.delete(bucket_name, api_file) + + # @task + # def update_page_number(rawg_page_number: int) -> int: + # # Update page number to fetch from the consecutive one in the next run + # next_page_number = int(rawg_page_number) + 1 + # Variable.set("api_page_number", next_page_number) # DAG Flow game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number) game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number) - placeholder_empty_table_tasks = placeholder_empty_table_task_group() - next_page_number = update_page_number(rawg_page_number) + # clear_extracted_csv_files = remove_extracted_api_csv(rawg_landing_gcs_bucket) + # next_page_number = update_page_number(rawg_page_number) - game_ids_list >> game_details_extractor >> placeholder_empty_table_tasks >> next_page_number + game_ids_list >> game_details_extractor >> load_rawg_api_games_data_to_bq + # >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_csv_files >> next_page_number rawg_api_extractor_dag() diff --git a/dags/utils/gcp_utils.py b/dags/utils/gcp_utils.py index 5b25077..1061352 100644 --- a/dags/utils/gcp_utils.py +++ b/dags/utils/gcp_utils.py @@ -11,9 +11,16 @@ # Pandas Library import import pandas as pd +# Pyarrow Library import +import pyarrow as pa +import pyarrow.parquet as pq + # JSON Library import import json +# import io +from io import BytesIO + info_logger = LoggerFactory.get_logger('INFO') error_logger = LoggerFactory.get_logger('ERROR') @@ -51,15 +58,37 @@ def get_gcp_connection_and_upload_to_gcs(bucket_name: str, dataframe_name: pd.Da # Intialize bucket object bucket = client.bucket(bucket_name) + if dataframe_name.columns.equals(['id', 'slug', 'name_original', 'description_raw', 'metacritic', 'released', 'tba', 'updated', 'rating', 'rating_top', 'playtime']): + # Override the schema of the dataframe to suit the bigquery table schema + schema = pa.schema([ + ('id', pa.int32()), + ('slug', pa.string()), + ('name_original', pa.string()), + ('description_raw', pa.string()), + ('metacritic', pa.string()), + ('released', pa.date32()), + ('tba', pa.bool_()), + ('updated', pa.timestamp('s')), + ('rating', pa.float64()), + ('rating_top', pa.int32()), + ('playtime', pa.int32()) + ]) + product_parquet_table = pa.Table.from_pandas(dataframe_name, schema=schema, preserve_index=False) + else: + # Other dataframes than games_df will be stored in parquet format without any schema change + product_parquet_table = pa.Table.from_pandas(dataframe_name, preserve_index=False) # Convert dataframe to CSV string - csv_data = dataframe_name.to_csv(index=False) + # parquet_data = dataframe_name.to_parquet(index=False) + + parquet_data = BytesIO() + pq.write_table(product_parquet_table, parquet_data) - # Upload CSV to GCS + # Upload Parquet to GCS try: # blob_name is the file name that needs to be created as placeholder at GCS end to write file into - blob_name = f'{blob_file_name}_{api_page_number}.csv' + blob_name = f'{blob_file_name}_{api_page_number}.parquet' blob = bucket.blob(blob_name) - blob.upload_from_string(csv_data, content_type='text/csv') + blob.upload_from_string(parquet_data.getvalue(), content_type='application/octet-stream') info_logger.info(f'Loaded file {blob_name} to bucket {bucket_name} successfully') except Exception as e: error_logger.error(f'Received following error while uploading {blob_name}, see full trace : {e}') diff --git a/dags/utils/rawg_api_caller.py b/dags/utils/rawg_api_caller.py index 4ef143a..9b95c3b 100644 --- a/dags/utils/rawg_api_caller.py +++ b/dags/utils/rawg_api_caller.py @@ -66,7 +66,9 @@ def get_unique_ids_per_endpoint(self, api_key: str, page_number: int) -> []: endpoint_params = { 'page_size': '10000', 'page': page_number, - 'platforms_count': '6' + 'ordering': 'released', + 'parent_platforms':'1,2,3,4,5,6,7,8,9,10', + 'dates':'1990-01-01,2023-12-12' } endpoint_response = requests.get(generate_api_url, params=endpoint_params) @@ -159,10 +161,16 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number: publisher_df_flattened = pd.json_normalize(games_json, sep='_', record_path=['publishers'], meta=['id'], meta_prefix='game_') publisher_df = pd.concat([publisher_df, publisher_df_flattened], ignore_index=True) + # Update the released and updated columns to datetime64 format + games_df['released'] = pd.to_datetime(games_df['released'], format='%Y-%m-%d') + games_df['updated'] = pd.to_datetime(games_df['updated']) + # Log the dimensions of each flattened file for tracking info_logger.info(f"Dimension of the data fetched and flattened for the following #{page_number} iteration: Games Table {games_df.shape}, Ratings Table {ratings_df.shape}, Platforms Table {platforms_df.shape}, Genre Table {genre_df.shape}, Publisher Table {publisher_df.shape}") info_logger.info(f"Following are the columns of the games table========") info_logger.info(f"{games_df.columns}") + info_logger.info(f"Following are the datatypes of the games table========") + info_logger.info(f"{games_df.dtypes}") info_logger.info(f"Following are the columns of the ratings table========") info_logger.info(f"{ratings_df.columns}") diff --git a/requirements.txt b/requirements.txt index 9f257c1..f2cf485 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ apache-airflow-providers-google==10.13.1 google-cloud-secret-manager==2.16.1 gcsfs==2024.2.0 -google-cloud-storage==2.15.0 \ No newline at end of file +google-cloud-storage==2.15.0 +pyarrow==15.0.2 \ No newline at end of file