Skip to content

Commit

Permalink
Merge pull request #8 from kevinsunny1996/task/var_logic_move
Browse files Browse the repository at this point in the history
Added vars and separated page update
  • Loading branch information
kevinsunny1996 authored Mar 26, 2024
2 parents 19faf54 + 240e436 commit 7b883c7
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions dags/rawg_api_extractor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def rawg_api_extractor_dag():
rawg_api_key = Variable.get('rawg_api_key')
rawg_page_number = int(Variable.get('api_page_number', default_var=1))
rawg_landing_gcs_bucket = Variable.get('gcs_rawg_api_landing_bucket')
rawg_api_bq_dataset = Variable.get('gcp_bq_dataset')
gcp_project_name = Variable.get('gcp_project_id')

# Task to get Game IDs to fetch data from in subsequent task
@task
Expand Down Expand Up @@ -133,18 +135,14 @@ def get_game_id_related_data(api_key: str, game_ids_list: list, page_number: int
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, genre_df, 'genres', rawg_page_number)
get_gcp_connection_and_upload_to_gcs(rawg_landing_gcs_bucket, publisher_df, 'publishers', rawg_page_number)

# Update page number to fetch from the consecutive one in the next run
next_page_number = int(rawg_page_number) + 1
Variable.set("api_page_number", next_page_number)

# Create Empty Table for Schema defined for the 5 tables created as output
@task
def create_rawg_api_placeholder_empty_table(table_name: str, schema: list) -> None:
create_empty_table_task = BigQueryCreateEmptyTableOperator(
task_id=f'create_empty_table_for_{table_name}',
dataset_id='rawg_api_elt_dataset', # Set your BigQuery dataset ID
dataset_id=rawg_api_bq_dataset, # Set your BigQuery dataset ID
table_id=table_name,
project_id='exemplary-tide-379122', # Set your BigQuery project ID
project_id=gcp_project_name, # Set your BigQuery project ID
schema_fields=schema,
gcp_conn_id='gcp', # Set your GCP connection ID
if_exists='skip' # Skip creating the table if it already exists
Expand All @@ -155,13 +153,20 @@ def create_rawg_api_placeholder_empty_table(table_name: str, schema: list) -> No
def placeholder_empty_table_task_group():
for table_name, schema in table_schemas.items():
create_rawg_api_placeholder_empty_table(table_name, schema)

@task
def update_page_number(rawg_page_number: int) -> int:
# Update page number to fetch from the consecutive one in the next run
next_page_number = int(rawg_page_number) + 1
Variable.set("api_page_number", next_page_number)

# DAG Flow
game_ids_list = get_rawg_api_game_ids(rawg_api_key, rawg_page_number)
game_details_extractor = get_game_id_related_data(rawg_api_key, game_ids_list, rawg_page_number)
placeholder_empty_table_tasks = placeholder_empty_table_task_group()
next_page_number = update_page_number(rawg_page_number)

game_ids_list >> game_details_extractor >> placeholder_empty_table_tasks
game_ids_list >> game_details_extractor >> placeholder_empty_table_tasks >> next_page_number


rawg_api_extractor_dag()
Expand Down

0 comments on commit 7b883c7

Please sign in to comment.