From f78a7f465059f52068b1644c4adc3b6bcdd01874 Mon Sep 17 00:00:00 2001 From: Kevin Parasseril Date: Sun, 28 Apr 2024 14:32:37 +0530 Subject: [PATCH] Updated fields to enforce type and added workflow to deploy to cloud --- .github/workflows/deploy_to_astro_cloud.yaml | 27 ++ Dockerfile | 2 +- dags/rawg_api_extractor_dag.py | 331 ++++++++++++++----- dags/utils/gcp_utils.py | 22 +- dags/utils/rawg_api_caller.py | 32 +- 5 files changed, 312 insertions(+), 102 deletions(-) create mode 100644 .github/workflows/deploy_to_astro_cloud.yaml diff --git a/.github/workflows/deploy_to_astro_cloud.yaml b/.github/workflows/deploy_to_astro_cloud.yaml new file mode 100644 index 0000000..682a240 --- /dev/null +++ b/.github/workflows/deploy_to_astro_cloud.yaml @@ -0,0 +1,27 @@ +name: Astronomer CI - Deploy code + +on: + workflow_dispatch: + # inputs: + # deploymentId: + # description: "Deployment ID" + # required: true + # type: string + # apiToken: + # description: "Astro API Token" + # required: true + # type: string + push: + branches: + - main + +env: + ASTRO_API_TOKEN: ${{ secrets.ASTRO_API_TOKEN }} + DEPLOYMENT_ID: ${{ secrets.DEPLOYMENT_ID }} + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Deploy to Astro + uses: astronomer/deploy-action@v0.4 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 787c7d1..e88bed9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,2 +1,2 @@ -FROM quay.io/astronomer/astro-runtime:10.6.0 +FROM quay.io/astronomer/astro-runtime:10.8.0 ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled \ No newline at end of file diff --git a/dags/rawg_api_extractor_dag.py b/dags/rawg_api_extractor_dag.py index 9eb78fb..bc46d2d 100644 --- a/dags/rawg_api_extractor_dag.py +++ b/dags/rawg_api_extractor_dag.py @@ -55,7 +55,7 @@ { "name": "released", "mode": "NULLABLE", - "type": "DATE", + "type": "STRING", "description": "", "fields": [] }, @@ -69,7 +69,7 @@ { "name": "updated", "mode": "NULLABLE", - "type": "TIMESTAMP", + "type": "STRING", "description": "", "fields": [] }, @@ -103,12 +103,171 @@ } ] +# Schema for genre table +schema_genres = [ + { + "name": "id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "name", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "slug", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "games_count", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "image_background", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "game_id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + } +] + +# Schema for platforms table +schema_platforms = [ + { + "name": "released_at", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "platform_id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "platform_name", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "platform_slug", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "platform_image", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "platform_year_end", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "platform_games_count", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "platform_image_background", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "game_id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "platform_year_start", + "mode": "", + "type": "STRING", + "description": "", + "fields": [] + } +] + +# Schema for ratings table +schema_ratings = [ + { + "name": "id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "title", + "mode": "NULLABLE", + "type": "STRING", + "description": "", + "fields": [] + }, + { + "name": "count", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + }, + { + "name": "percent", + "mode": "NULLABLE", + "type": "FLOAT", + "description": "", + "fields": [] + }, + { + "name": "game_id", + "mode": "NULLABLE", + "type": "INTEGER", + "description": "", + "fields": [] + } +] + # Schema for publishers table schema_publishers = [ { "name": "id", "mode": "NULLABLE", - "type": "FLOAT", + "type": "INTEGER", "description": "", "fields": [] }, @@ -129,7 +288,7 @@ { "name": "games_count", "mode": "NULLABLE", - "type": "FLOAT", + "type": "INTEGER", "description": "", "fields": [] }, @@ -204,24 +363,28 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int # Save the files as CSV directly to GCS , post creating GCS bucket variable get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, games_df, 'games', rawg_page_number) - # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number) - # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number) - # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number) - # get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number) + get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, ratings_df, 'ratings', rawg_page_number) + get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, platforms_df, 'platforms', rawg_page_number) + get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number) + get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number) # Load contents from GCS onto BigQuery for that run - # load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator( - # task_id=f'load_ratings_to_bq', - # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - # source_objects=[f'ratings_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS - # source_format='csv', - # destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to. - # gcp_conn_id='gcp', # Set your GCP connection ID. - # create_disposition='CREATE_IF_NEEDED', - # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - # skip_leading_rows=1 # Skip the header row in the CSV file. - # ) + load_rawg_api_ratings_data_to_bq = GCSToBigQueryOperator( + task_id=f'load_ratings_to_bq', + bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + source_objects=[f'ratings_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + source_format='PARQUET', + destination_project_dataset_table=f'{rawg_api_bq_dataset}.ratings', # Set your BigQuery table name to load the data to. + gcp_conn_id='gcp', # Set your GCP connection ID. + allow_quoted_newlines=True, + ignore_unknown_values=True, + schema_fields=schema_ratings, + create_disposition='CREATE_IF_NEEDED', + autodetect=False, + write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + skip_leading_rows=1 # Skip the header row in the CSV file. + ) load_rawg_api_games_data_to_bq = GCSToBigQueryOperator( task_id=f'load_games_to_bq', @@ -230,7 +393,6 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int source_format='PARQUET', allow_quoted_newlines=True, ignore_unknown_values=True, - # max_bad_records=40, destination_project_dataset_table=f'{rawg_api_bq_dataset}.games', # Set your BigQuery table name to load the data to. gcp_conn_id='gcp', # Set your GCP connection ID. create_disposition='CREATE_IF_NEEDED', @@ -240,72 +402,79 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int skip_leading_rows=1 # Skip the header row in the CSV file. ) - # load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator( - # task_id=f'load_genres_to_bq', - # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - # source_objects=[f'genres_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS - # source_format='csv', - # destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to. - # gcp_conn_id='gcp', # Set your GCP connection ID. - # create_disposition='CREATE_IF_NEEDED', - # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - # skip_leading_rows=1 # Skip the header row in the CSV file. - # ) - - # load_rawg_api_platforms_data_to_bq = GCSToBigQueryOperator( - # task_id=f'load_platforms_to_bq', - # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - # source_objects=[f'platforms_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS - # source_format='csv', - # destination_project_dataset_table=f'{rawg_api_bq_dataset}.platforms', # Set your BigQuery table name to load the data to. - # gcp_conn_id='gcp', # Set your GCP connection ID. - # create_disposition='CREATE_IF_NEEDED', - # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - # skip_leading_rows=1 # Skip the header row in the CSV file. - # ) - - # load_rawg_api_publishers_data_to_bq = GCSToBigQueryOperator( - # task_id=f'load_publishers_to_bq', - # bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. - # source_objects=[f'publishers_{rawg_page_number}.csv'], # Set the name of the CSV file in GCS - # source_format='csv', - # destination_project_dataset_table=f'{rawg_api_bq_dataset}.publishers', # Set your BigQuery table name to load the data to. - # gcp_conn_id='gcp', # Set your GCP connection ID. - # create_disposition='CREATE_IF_NEEDED', - # write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. - # skip_leading_rows=1, # Skip the header row in the CSV file. - # autodetect=False, - # schema_fields=schema_publishers, - # allow_quoted_newlines=True, - # ignore_unknown_values=True, - # max_bad_records=40 - # ) - - # @task - # def remove_extracted_api_csv(bucket_name: str) -> None: - # rawg_api_gcs_hook = GCSHook(gcp_conn_id='gcp') - - # # Get the files to delete - # api_csv_files = rawg_api_gcs_hook.list(bucket_name) - - # # Delete the files - # for api_file in api_csv_files: - # rawg_api_gcs_hook.delete(bucket_name, api_file) - - # @task - # def update_page_number(rawg_page_number: int) -> int: - # # Update page number to fetch from the consecutive one in the next run - # next_page_number = int(rawg_page_number) + 1 - # Variable.set("api_page_number", next_page_number) + load_rawg_api_genres_data_to_bq = GCSToBigQueryOperator( + task_id=f'load_genres_to_bq', + bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + source_objects=[f'genres_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + source_format='PARQUET', + allow_quoted_newlines=True, + ignore_unknown_values=True, + destination_project_dataset_table=f'{rawg_api_bq_dataset}.genres', # Set your BigQuery table name to load the data to. + gcp_conn_id='gcp', # Set your GCP connection ID. + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + schema_fields=schema_genres, + autodetect=False, + skip_leading_rows=1 # Skip the header row in the CSV file. + ) + + load_rawg_api_platforms_data_to_bq = GCSToBigQueryOperator( + task_id=f'load_platforms_to_bq', + bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + source_objects=[f'platforms_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + source_format='PARQUET', + allow_quoted_newlines=True, + ignore_unknown_values=True, + destination_project_dataset_table=f'{rawg_api_bq_dataset}.platforms', # Set your BigQuery table name to load the data to. + gcp_conn_id='gcp', # Set your GCP connection ID. + create_disposition='CREATE_IF_NEEDED', + schema_fields=schema_platforms, + autodetect=False, + write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + skip_leading_rows=1 # Skip the header row in the CSV file. + ) + + load_rawg_api_publishers_data_to_bq = GCSToBigQueryOperator( + task_id=f'load_publishers_to_bq', + bucket=rawg_landing_gcs_bucket, # Set your GCS bucket name to pick file from. + source_objects=[f'publishers_{rawg_page_number}.parquet'], # Set the name of the CSV file in GCS + source_format='PARQUET', + destination_project_dataset_table=f'{rawg_api_bq_dataset}.publishers', # Set your BigQuery table name to load the data to. + gcp_conn_id='gcp', # Set your GCP connection ID. + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_APPEND', # If the table already exists, BigQuery appends the data to the table. + skip_leading_rows=1, # Skip the header row in the CSV file. + autodetect=False, + schema_fields=schema_publishers, + allow_quoted_newlines=True, + ignore_unknown_values=True, + max_bad_records=40 + ) + + @task + def remove_extracted_api_parquet_files(bucket_name: str) -> None: + rawg_api_gcs_hook = GCSHook(gcp_conn_id='gcp') + + # Get the files to delete + api_parquet_files = rawg_api_gcs_hook.list(bucket_name) + + # Delete the files + for api_file in api_parquet_files: + rawg_api_gcs_hook.delete(bucket_name, api_file) + + @task + def update_page_number(rawg_page_number: int) -> int: + # Update page number to fetch from the consecutive one in the next run + next_page_number = int(rawg_page_number) + 1 + Variable.set("api_page_number", next_page_number) # DAG Flow game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number) game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number) - # clear_extracted_csv_files = remove_extracted_api_csv(rawg_landing_gcs_bucket) - # next_page_number = update_page_number(rawg_page_number) + clear_extracted_parquet_files = remove_extracted_api_parquet_files(rawg_landing_gcs_bucket) + next_page_number = update_page_number(rawg_page_number) - game_ids_list >> game_details_extractor >> load_rawg_api_games_data_to_bq - # >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_csv_files >> next_page_number + game_ids_list >> game_details_extractor >> load_rawg_api_ratings_data_to_bq >> load_rawg_api_games_data_to_bq >> load_rawg_api_genres_data_to_bq >> load_rawg_api_platforms_data_to_bq >> load_rawg_api_publishers_data_to_bq >> clear_extracted_parquet_files >> next_page_number rawg_api_extractor_dag() diff --git a/dags/utils/gcp_utils.py b/dags/utils/gcp_utils.py index 1061352..25866a6 100644 --- a/dags/utils/gcp_utils.py +++ b/dags/utils/gcp_utils.py @@ -58,29 +58,13 @@ def get_gcp_connection_and_upload_to_gcs(bucket_name: str, dataframe_name: pd.Da # Intialize bucket object bucket = client.bucket(bucket_name) - if dataframe_name.columns.equals(['id', 'slug', 'name_original', 'description_raw', 'metacritic', 'released', 'tba', 'updated', 'rating', 'rating_top', 'playtime']): - # Override the schema of the dataframe to suit the bigquery table schema - schema = pa.schema([ - ('id', pa.int32()), - ('slug', pa.string()), - ('name_original', pa.string()), - ('description_raw', pa.string()), - ('metacritic', pa.string()), - ('released', pa.date32()), - ('tba', pa.bool_()), - ('updated', pa.timestamp('s')), - ('rating', pa.float64()), - ('rating_top', pa.int32()), - ('playtime', pa.int32()) - ]) - product_parquet_table = pa.Table.from_pandas(dataframe_name, schema=schema, preserve_index=False) - else: - # Other dataframes than games_df will be stored in parquet format without any schema change - product_parquet_table = pa.Table.from_pandas(dataframe_name, preserve_index=False) + # Save the pandas dataframe into a PyArrow table + product_parquet_table = pa.Table.from_pandas(dataframe_name, preserve_index=False) # Convert dataframe to CSV string # parquet_data = dataframe_name.to_parquet(index=False) parquet_data = BytesIO() + # Write the parquet data to a BytesIO object in memory pq.write_table(product_parquet_table, parquet_data) # Upload Parquet to GCS diff --git a/dags/utils/rawg_api_caller.py b/dags/utils/rawg_api_caller.py index 9b95c3b..0eff06d 100644 --- a/dags/utils/rawg_api_caller.py +++ b/dags/utils/rawg_api_caller.py @@ -161,9 +161,39 @@ def get_game_details_per_id(self, api_key: str, endpoint_ids: list, page_number: publisher_df_flattened = pd.json_normalize(games_json, sep='_', record_path=['publishers'], meta=['id'], meta_prefix='game_') publisher_df = pd.concat([publisher_df, publisher_df_flattened], ignore_index=True) - # Update the released and updated columns to datetime64 format + # Enforcing datatypes for columns of games dataframe + games_df['id'] = games_df['id'].astype(int) games_df['released'] = pd.to_datetime(games_df['released'], format='%Y-%m-%d') + games_df['released'] = games_df['released'].astype(str) games_df['updated'] = pd.to_datetime(games_df['updated']) + games_df['updated'] = games_df['updated'].astype(str) + games_df['rating_top'] = games_df['rating_top'].astype(int) + games_df['playtime'] = games_df['playtime'].astype(int) + games_df['metacritic'] = games_df['metacritic'].astype(str) + + # Enforcing datatypes for columns of genres dataframe + genre_df['id'] = genre_df['id'].astype(int) + genre_df['games_count'] = genre_df['games_count'].astype(int) + genre_df['game_id'] = genre_df['game_id'].astype(int) + + # Enforcing datatypes for columns of platforms dataframe + platforms_df['released_at'] = pd.to_datetime(platforms_df['released_at'], format='%Y-%m-%d') + platforms_df['released_at'] = platforms_df['released_at'].astype(str) + platforms_df['platform_id'] = platforms_df['platform_id'].astype(int) + platforms_df['platform_games_count'] = platforms_df['platform_games_count'].astype(int) + platforms_df['game_id'] = platforms_df['game_id'].astype(int) + + # Enforcing datatypes for columns of publisher dataframe + publisher_df['id'] = publisher_df['id'].astype(int) + publisher_df['games_count'] = publisher_df['games_count'].astype(int) + publisher_df['game_id'] = publisher_df['game_id'].astype(int) + + # Enforcing datatypes for columns of ratings dataframe + ratings_df['id'] = ratings_df['id'].astype(int) + ratings_df['count'] = ratings_df['count'].astype(int) + ratings_df['percent'] = ratings_df['percent'].astype(float) + ratings_df['game_id'] = ratings_df['game_id'].astype(int) + # Log the dimensions of each flattened file for tracking info_logger.info(f"Dimension of the data fetched and flattened for the following #{page_number} iteration: Games Table {games_df.shape}, Ratings Table {ratings_df.shape}, Platforms Table {platforms_df.shape}, Genre Table {genre_df.shape}, Publisher Table {publisher_df.shape}")