From 2a3fb9bf128205ae4bc7768ec79d0cf7540b0b3e Mon Sep 17 00:00:00 2001 From: Kevin Parasseril Date: Tue, 30 Apr 2024 15:17:49 +0530 Subject: [PATCH] Added type enforcing on column platform_year_start --- dags/rawg_api_extractor_dag.py | 94 +++++++++++++++++----------------- dags/utils/rawg_api_caller.py | 1 + 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/dags/rawg_api_extractor_dag.py b/dags/rawg_api_extractor_dag.py index bc46d2d..3b43ae0 100644 --- a/dags/rawg_api_extractor_dag.py +++ b/dags/rawg_api_extractor_dag.py @@ -370,53 +370,53 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int # Load contents from GCS onto BigQuery for that run - load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator( - task_id=f'load_ratings_to_bq', - bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - source_objects=[f'ratings_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS - source_format='PARQUET', - destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to. - gcp_conn_id='gcp', # Set your GCP connection ID. - allow_quoted_newlines=True, - ignore_unknown_values=True, - schema_fields=schema_ratings, - create_disposition='CREATE_IF_NEEDED', - autodetect=False, - write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - skip_leading_rows=1 # Skip the header row in the CSV file. - ) - - load_rawg_api_games_data_to_bq = GCSToBigQueryOperator( - task_id=f'load_games_to_bq', - bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - source_objects=[f'games_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS - source_format='PARQUET', - allow_quoted_newlines=True, - ignore_unknown_values=True, - destination_project_dataset_table=f'{rawg_api_bq_dataset}.games', # Set your BigQuery table name to load the data to. - gcp_conn_id='gcp', # Set your GCP connection ID. - create_disposition='CREATE_IF_NEEDED', - schema_fields=schema_games, - autodetect=False, - write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - skip_leading_rows=1 # Skip the header row in the CSV file. - ) - - load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator( - task_id=f'load_genres_to_bq', - bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - source_objects=[f'genres_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS - source_format='PARQUET', - allow_quoted_newlines=True, - ignore_unknown_values=True, - destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to. - gcp_conn_id='gcp', # Set your GCP connection ID. - create_disposition='CREATE_IF_NEEDED', - write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - schema_fields=schema_genres, - autodetect=False, - skip_leading_rows=1 # Skip the header row in the CSV file. - ) + # load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_ratings_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'ratings_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + # source_format='PARQUET', + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # allow_quoted_newlines=True, + # ignore_unknown_values=True, + # schema_fields=schema_ratings, + # create_disposition='CREATE_IF_NEEDED', + # autodetect=False, + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) + + # load_rawg_api_games_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_games_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'games_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + # source_format='PARQUET', + # allow_quoted_newlines=True, + # ignore_unknown_values=True, + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.games', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # schema_fields=schema_games, + # autodetect=False, + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) + + # load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator( + # task_id=f'load_genres_to_bq', + # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + # source_objects=[f'genres_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + # source_format='PARQUET', + # allow_quoted_newlines=True, + # ignore_unknown_values=True, + # destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to. + # gcp_conn_id='gcp', # Set your GCP connection ID. + # create_disposition='CREATE_IF_NEEDED', + # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + # schema_fields=schema_genres, + # autodetect=False, + # skip_leading_rows=1 # Skip the header row in the CSV file. + # ) load_rawg_api_platforms_data_to_bq = GCSToBigQueryOperator( task_id=f'load_platforms_to_bq', diff --git a/dags/utils/rawg_api_caller.py b/dags/utils/rawg_api_caller.py index 0eff06d..eca403c 100644 --- a/dags/utils/rawg_api_caller.py +++ b/dags/utils/rawg_api_caller.py @@ -182,6 +182,7 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number: platforms_df['platform_id'] = platforms_df['platform_id'].astype(int) platforms_df['platform_games_count'] = platforms_df['platform_games_count'].astype(int) platforms_df['game_id'] = platforms_df['game_id'].astype(int) + platforms_df['platform_year_start'] = platforms_df['platform_year_start'].astype(str) # Enforcing datatypes for columns of publisher dataframe publisher_df['id'] = publisher_df['id'].astype(int)