From 1fc980b8572d49a728a923beaf45b14399d3ecce Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Fri, 13 Dec 2024 11:30:38 +0530 Subject: [PATCH 01/10] code to create a new dbt cloud task --- ddpui/api/orgtask_api.py | 19 ++++++++++++++----- ddpui/api/pipeline_api.py | 8 ++++++-- ddpui/core/orgtaskfunctions.py | 19 ++++++++++++------- ddpui/core/pipelinefunctions.py | 23 +++++++++++++++++++++-- ddpui/ddpdbt/dbt_service.py | 2 ++ ddpui/ddpdbt/schema.py | 19 ++++++++++++++++--- ddpui/ddpprefect/__init__.py | 1 + ddpui/ddpprefect/schema.py | 23 +++++++++++++++++++++++ ddpui/utils/constants.py | 6 ++++-- seed/tasks.json | 11 +++++++++++ 10 files changed, 110 insertions(+), 21 deletions(-) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 8e5c06a3..64c6718d 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -31,7 +31,7 @@ from ddpui.ddpprefect.schema import ( PrefectSecretBlockCreate, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters from ddpui.core.orgdbt_manager import DbtProjectManager @@ -90,14 +90,15 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask = OrgTask.objects.create( org=orguser.org, task=task, - parameters=parameters, + parameters=parameters, # here the accountId, jobId and apiKey will be recieved for dbt-cloud generated_by="client", uuid=uuid.uuid4(), ) dataflow = None - if task.slug in LONG_RUNNING_TASKS: - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + # For dbt-cli + if task.slug in LONG_RUNNING_TASKS and task.type is "dbt": + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) @@ -113,6 +114,14 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask, cli_profile_block, dbt_project_params ) + # For dbt-cloud + if task.slug in LONG_RUNNING_TASKS and task.type is "dbtcloud": + dbt_cloud_params: DbtCloudParams = parameters["options"] + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_params # this will contain accountId, apikey, and jobId + ) + # if the task id dbtcloud run then create another dataflow createprfectdeployfor dbtcloud task (112, 114 but for cloud) + return { **model_to_dict(orgtask, fields=["parameters"]), "task_slug": orgtask.task.slug, @@ -326,7 +335,7 @@ def post_run_prefect_org_task( if orgdbt is None: raise HttpError(400, "dbt is not configured for this client") - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 45b8e346..53b7836e 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -102,10 +102,14 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): dbt_project_params: DbtProjectParams = None dbt_git_orgtasks = [] orgdbt = orguser.org.dbt - if payload.transformTasks and len(payload.transformTasks) > 0: + if ( + payload.transformTasks and len(payload.transformTasks) > 0 + ): # dont modify this block as its of rlocal logger.info("Dbt tasks being pushed to the pipeline") + # for dbt cloud we dont check the dbcliblcok (add if condition to check it) # dbt params + dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt) # dbt cli profile block @@ -137,7 +141,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if error: raise HttpError(400, error) tasks += task_configs - + # new if payload.cloud transorm task, get taskconfig, dbt-run map_org_tasks += dbt_git_orgtasks # create deployment diff --git a/ddpui/core/orgtaskfunctions.py b/ddpui/core/orgtaskfunctions.py index 3da9288e..19e9124e 100644 --- a/ddpui/core/orgtaskfunctions.py +++ b/ddpui/core/orgtaskfunctions.py @@ -20,9 +20,9 @@ PrefectDataFlowCreateSchema3, ) from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtProjectParams from ddpui.ddpprefect import prefect_service -from ddpui.core.pipelinefunctions import setup_dbt_core_task_config +from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR from ddpui.utils.helpers import generate_hash_id @@ -117,6 +117,15 @@ def create_prefect_deployment_for_dbtcore_task( """ hash_code = generate_hash_id(8) deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}" + + tasks = [] + if org_task.task.type == "dbt": + tasks = [ + setup_dbt_core_task_config(org_task, cli_profile_block, dbt_project_params).to_json() + ] + elif org_task.task.type == "dbtcloud": + tasks = [setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json()] + dataflow = prefect_service.create_dataflow_v1( PrefectDataFlowCreateSchema3( deployment_name=deployment_name, @@ -124,11 +133,7 @@ def create_prefect_deployment_for_dbtcore_task( orgslug=org_task.org.slug, deployment_params={ "config": { - "tasks": [ - setup_dbt_core_task_config( - org_task, cli_profile_block, dbt_project_params - ).to_json() - ], + "tasks": tasks, "org_slug": org_task.org.slug, } }, diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 0bd95495..99ac6bd6 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -15,6 +15,7 @@ from ddpui.utils.custom_logger import CustomLogger from ddpui.ddpprefect.schema import ( PrefectDbtTaskSetup, + PrefectDbtCloudTaskSetup, PrefectShellTaskSetup, PrefectAirbyteSyncTaskSetup, PrefectAirbyteRefreshSchemaTaskSetup, @@ -22,6 +23,7 @@ ) from ddpui.ddpprefect import ( AIRBYTECONNECTION, + DBTCLOUD, DBTCORE, SECRET, SHELLOPERATION, @@ -39,7 +41,7 @@ UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams, DbtProjectParams logger = CustomLogger("ddpui") @@ -83,7 +85,7 @@ def setup_airbyte_update_schema_task_config( def setup_dbt_core_task_config( org_task: OrgTask, cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtProjectParams, + dbt_project_params: DbtCliParams, seq: int = 1, ): """constructs the prefect payload for a dbt job""" @@ -102,6 +104,23 @@ def setup_dbt_core_task_config( ) +def setup_dbt_cloud_task_config( + org_task: OrgTask, + dbt_project_params: DbtCloudParams, + seq: int = 1, +): + """constructs the prefect payload for a dbt-cloud job""" + return PrefectDbtCloudTaskSetup( + seq=seq, + slug=org_task.task.slug, + type=DBTCLOUD, + api_key=dbt_project_params.api_key, + account_id=dbt_project_params.account_id, + job_id=dbt_project_params.job_id, + orgtask_uuid=str(org_task.uuid), + ) + + def setup_git_pull_shell_task_config( org_task: OrgTask, project_dir: str, diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index b14d5b32..ad372b11 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -30,6 +30,7 @@ TASK_DBTRUN, TASK_DBTSEED, TASK_DBTDEPS, + TASK_DBTCLOUDRUN, ) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.utils.timezone import as_ist @@ -107,6 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, + TASK_DBTCLOUDRUN: {"flags": [], "options": ["account_id", "api-key", "job-id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index aea1086d..efa8e901 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -1,11 +1,11 @@ -from typing import Optional, Union +from typing import Literal, Optional, Union from ninja import Schema from pathlib import Path -class DbtProjectParams(Schema): +class DbtCliParams(Schema): """ - schema to define all parameters required to run a dbt project + Schema to define all parameters required to run a dbt project using CLI. """ dbt_env_dir: Union[str, Path] @@ -14,3 +14,16 @@ class DbtProjectParams(Schema): target: str venv_binary: Union[str, Path] dbt_binary: Union[str, Path] + + +class DbtCloudParams(Schema): + """ + Schema to define all parameters required to run a dbt project using dbt Cloud. + """ + + api_key: str + account_id: int + job_id: int + + +DbtProjectParams = Union[DbtCliParams, DbtCloudParams] diff --git a/ddpui/ddpprefect/__init__.py b/ddpui/ddpprefect/__init__.py index 86e0834e..ad14ba4b 100644 --- a/ddpui/ddpprefect/__init__.py +++ b/ddpui/ddpprefect/__init__.py @@ -3,6 +3,7 @@ AIRBYTECONNECTION = "Airbyte Connection" SHELLOPERATION = "Shell Operation" DBTCORE = "dbt Core Operation" +DBTCLOUD = "dbt Cloud Operation" DBTCLIPROFILE = "dbt CLI Profile" SECRET = "Secret" diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index ecacd5a5..d18ee8af 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -160,6 +160,29 @@ def to_json(self): } +class PrefectDbtCloudTaskSetup(Schema): + "request payload to trigger a dbt cloud run task in prefect" + seq = int = (0,) + slug = (str,) + type = (str,) + api_key = (str,) + account_id = (str,) + job_id = (str,) + orgtask_uuid = (str,) + + def to_json(self): + """JSON serialization""" + return { + "seq": self.seq, + "slug": self.slug, + "type": self.type, + "api_key": self.api_key, + "account_id": self.account_id, + "job_id": self.job_id, + "orgtask_uuid": self.orgtask_uuid, + } + + class DbtProfile(Schema): """Docstring""" diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 9c2fbfef..3538beea 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -3,7 +3,8 @@ TASK_DBTTEST = "dbt-test" TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" -TASK_GITPULL = "git-pull" +TASK_GITPULL = ("git-pull",) +TASK_DBTCLOUDRUN = "dbtcloud-run" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync" TASK_AIRBYTERESET = "airbyte-reset" @@ -30,12 +31,13 @@ TASK_DBTDEPS, TASK_DBTRUN, TASK_DBTTEST, + TASK_DBTCLOUDRUN, ] # These are tasks to be run via deployment # Adding a new task here will work for any new orgtask created # But for the current ones a script would need to be run to set them with a deployment -LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST] +LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUDRUN] # airbyte sync timeout in deployment params AIRBYTE_SYNC_TIMEOUT = 15 diff --git a/seed/tasks.json b/seed/tasks.json index 134c45c0..ba3826a9 100644 --- a/seed/tasks.json +++ b/seed/tasks.json @@ -119,5 +119,16 @@ "command": null, "is_system": false } + }, + { + "model": "ddpui.Task", + "pk": 11, + "fields": { + "type": "dbtcloud", + "slug": "dbtcloud-run", + "label": "DBT Cloud run", + "command": "cloud-run", + "is_system": true + } } ] \ No newline at end of file From 22bbb0970d1a07daa75733f931553b96d5d7a591 Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Fri, 13 Dec 2024 18:10:03 +0530 Subject: [PATCH 02/10] changes added --- ddpui/api/orgtask_api.py | 19 ++++++++++++++----- ddpui/api/pipeline_api.py | 12 +++++++++--- ddpui/core/orgtaskfunctions.py | 19 ++++++++++++------- ddpui/core/pipelinefunctions.py | 25 ++++++++++++++++++++++--- ddpui/ddpdbt/dbt_service.py | 2 ++ ddpui/ddpdbt/schema.py | 19 ++++++++++++++++--- ddpui/ddpprefect/__init__.py | 1 + ddpui/ddpprefect/schema.py | 23 +++++++++++++++++++++++ ddpui/settings.py | 20 ++++++++++---------- ddpui/utils/constants.py | 6 ++++-- seed/tasks.json | 11 +++++++++++ 11 files changed, 124 insertions(+), 33 deletions(-) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 8e5c06a3..64c6718d 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -31,7 +31,7 @@ from ddpui.ddpprefect.schema import ( PrefectSecretBlockCreate, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters from ddpui.core.orgdbt_manager import DbtProjectManager @@ -90,14 +90,15 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask = OrgTask.objects.create( org=orguser.org, task=task, - parameters=parameters, + parameters=parameters, # here the accountId, jobId and apiKey will be recieved for dbt-cloud generated_by="client", uuid=uuid.uuid4(), ) dataflow = None - if task.slug in LONG_RUNNING_TASKS: - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + # For dbt-cli + if task.slug in LONG_RUNNING_TASKS and task.type is "dbt": + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) @@ -113,6 +114,14 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask, cli_profile_block, dbt_project_params ) + # For dbt-cloud + if task.slug in LONG_RUNNING_TASKS and task.type is "dbtcloud": + dbt_cloud_params: DbtCloudParams = parameters["options"] + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_params # this will contain accountId, apikey, and jobId + ) + # if the task id dbtcloud run then create another dataflow createprfectdeployfor dbtcloud task (112, 114 but for cloud) + return { **model_to_dict(orgtask, fields=["parameters"]), "task_slug": orgtask.task.slug, @@ -326,7 +335,7 @@ def post_run_prefect_org_task( if orgdbt is None: raise HttpError(400, "dbt is not configured for this client") - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 45b8e346..a9d0ba78 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -54,7 +54,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if payload.name in [None, ""]: raise HttpError(400, "must provide a name for the flow") - tasks = [] + tasks = [] # This is main task array- containing airbyte and dbt task both. map_org_tasks = [] # seq of org tasks to be mapped in pipelin/ dataflow # push conection orgtasks in pipelin @@ -102,10 +102,16 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): dbt_project_params: DbtProjectParams = None dbt_git_orgtasks = [] orgdbt = orguser.org.dbt - if payload.transformTasks and len(payload.transformTasks) > 0: + + # checkng + if ( + payload.transformTasks and len(payload.transformTasks) > 0 + ): # dont modify this block as its of rlocal logger.info("Dbt tasks being pushed to the pipeline") + # for dbt cloud we dont check the dbcliblcok (add if condition to check it) # dbt params + dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt) # dbt cli profile block @@ -137,7 +143,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if error: raise HttpError(400, error) tasks += task_configs - + # new if payload.cloud transorm task, get taskconfig, dbt-run map_org_tasks += dbt_git_orgtasks # create deployment diff --git a/ddpui/core/orgtaskfunctions.py b/ddpui/core/orgtaskfunctions.py index 3da9288e..19e9124e 100644 --- a/ddpui/core/orgtaskfunctions.py +++ b/ddpui/core/orgtaskfunctions.py @@ -20,9 +20,9 @@ PrefectDataFlowCreateSchema3, ) from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtProjectParams from ddpui.ddpprefect import prefect_service -from ddpui.core.pipelinefunctions import setup_dbt_core_task_config +from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR from ddpui.utils.helpers import generate_hash_id @@ -117,6 +117,15 @@ def create_prefect_deployment_for_dbtcore_task( """ hash_code = generate_hash_id(8) deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}" + + tasks = [] + if org_task.task.type == "dbt": + tasks = [ + setup_dbt_core_task_config(org_task, cli_profile_block, dbt_project_params).to_json() + ] + elif org_task.task.type == "dbtcloud": + tasks = [setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json()] + dataflow = prefect_service.create_dataflow_v1( PrefectDataFlowCreateSchema3( deployment_name=deployment_name, @@ -124,11 +133,7 @@ def create_prefect_deployment_for_dbtcore_task( orgslug=org_task.org.slug, deployment_params={ "config": { - "tasks": [ - setup_dbt_core_task_config( - org_task, cli_profile_block, dbt_project_params - ).to_json() - ], + "tasks": tasks, "org_slug": org_task.org.slug, } }, diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 0bd95495..7b2eecfa 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -15,6 +15,7 @@ from ddpui.utils.custom_logger import CustomLogger from ddpui.ddpprefect.schema import ( PrefectDbtTaskSetup, + PrefectDbtCloudTaskSetup, PrefectShellTaskSetup, PrefectAirbyteSyncTaskSetup, PrefectAirbyteRefreshSchemaTaskSetup, @@ -22,6 +23,7 @@ ) from ddpui.ddpprefect import ( AIRBYTECONNECTION, + DBTCLOUD, DBTCORE, SECRET, SHELLOPERATION, @@ -39,7 +41,7 @@ UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams, DbtProjectParams logger = CustomLogger("ddpui") @@ -83,7 +85,7 @@ def setup_airbyte_update_schema_task_config( def setup_dbt_core_task_config( org_task: OrgTask, cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtProjectParams, + dbt_project_params: DbtCliParams, seq: int = 1, ): """constructs the prefect payload for a dbt job""" @@ -102,6 +104,23 @@ def setup_dbt_core_task_config( ) +def setup_dbt_cloud_task_config( + org_task: OrgTask, + dbt_project_params: DbtCloudParams, + seq: int = 1, +): + """constructs the prefect payload for a dbt-cloud job""" + return PrefectDbtCloudTaskSetup( + seq=seq, + slug=org_task.task.slug, + type=DBTCLOUD, + api_key=dbt_project_params.api_key, + account_id=dbt_project_params.account_id, + job_id=dbt_project_params.job_id, + orgtask_uuid=str(org_task.uuid), + ) + + def setup_git_pull_shell_task_config( org_task: OrgTask, project_dir: str, @@ -157,7 +176,7 @@ def pipeline_with_orgtasks( This assumes the list of orgtasks is in the correct sequence """ task_configs = [] - + # This block works perfectly for dbt cli tasks and dbt cloud tasks both. for org_task in org_tasks: task_config = None if org_task.task.slug == TASK_AIRBYTERESET: diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index b14d5b32..ad372b11 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -30,6 +30,7 @@ TASK_DBTRUN, TASK_DBTSEED, TASK_DBTDEPS, + TASK_DBTCLOUDRUN, ) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.utils.timezone import as_ist @@ -107,6 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, + TASK_DBTCLOUDRUN: {"flags": [], "options": ["account_id", "api-key", "job-id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index aea1086d..efa8e901 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -1,11 +1,11 @@ -from typing import Optional, Union +from typing import Literal, Optional, Union from ninja import Schema from pathlib import Path -class DbtProjectParams(Schema): +class DbtCliParams(Schema): """ - schema to define all parameters required to run a dbt project + Schema to define all parameters required to run a dbt project using CLI. """ dbt_env_dir: Union[str, Path] @@ -14,3 +14,16 @@ class DbtProjectParams(Schema): target: str venv_binary: Union[str, Path] dbt_binary: Union[str, Path] + + +class DbtCloudParams(Schema): + """ + Schema to define all parameters required to run a dbt project using dbt Cloud. + """ + + api_key: str + account_id: int + job_id: int + + +DbtProjectParams = Union[DbtCliParams, DbtCloudParams] diff --git a/ddpui/ddpprefect/__init__.py b/ddpui/ddpprefect/__init__.py index 86e0834e..ad14ba4b 100644 --- a/ddpui/ddpprefect/__init__.py +++ b/ddpui/ddpprefect/__init__.py @@ -3,6 +3,7 @@ AIRBYTECONNECTION = "Airbyte Connection" SHELLOPERATION = "Shell Operation" DBTCORE = "dbt Core Operation" +DBTCLOUD = "dbt Cloud Operation" DBTCLIPROFILE = "dbt CLI Profile" SECRET = "Secret" diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index ecacd5a5..d18ee8af 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -160,6 +160,29 @@ def to_json(self): } +class PrefectDbtCloudTaskSetup(Schema): + "request payload to trigger a dbt cloud run task in prefect" + seq = int = (0,) + slug = (str,) + type = (str,) + api_key = (str,) + account_id = (str,) + job_id = (str,) + orgtask_uuid = (str,) + + def to_json(self): + """JSON serialization""" + return { + "seq": self.seq, + "slug": self.slug, + "type": self.type, + "api_key": self.api_key, + "account_id": self.account_id, + "job_id": self.job_id, + "orgtask_uuid": self.orgtask_uuid, + } + + class DbtProfile(Schema): """Docstring""" diff --git a/ddpui/settings.py b/ddpui/settings.py index 940886a0..019cc5de 100644 --- a/ddpui/settings.py +++ b/ddpui/settings.py @@ -22,16 +22,16 @@ load_dotenv() -sentry_sdk.init( - dsn=os.getenv("SENTRY_DSN"), - # Set traces_sample_rate to 1.0 to capture 100% - # of transactions for performance monitoring. - traces_sample_rate=float(os.getenv("SENTRY_TSR", "1.0")), - # Set profiles_sample_rate to 1.0 to profile 100% - # of sampled transactions. - # We recommend adjusting this value in production. - profiles_sample_rate=float(os.getenv("SENTRY_PSR", "1.0")), -) +# sentry_sdk.init( +# dsn=os.getenv("SENTRY_DSN"), +# # Set traces_sample_rate to 1.0 to capture 100% +# # of transactions for performance monitoring. +# traces_sample_rate=float(os.getenv("SENTRY_TSR", "1.0")), +# # Set profiles_sample_rate to 1.0 to profile 100% +# # of sampled transactions. +# # We recommend adjusting this value in production. +# profiles_sample_rate=float(os.getenv("SENTRY_PSR", "1.0")), +# ) # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 9c2fbfef..3538beea 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -3,7 +3,8 @@ TASK_DBTTEST = "dbt-test" TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" -TASK_GITPULL = "git-pull" +TASK_GITPULL = ("git-pull",) +TASK_DBTCLOUDRUN = "dbtcloud-run" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync" TASK_AIRBYTERESET = "airbyte-reset" @@ -30,12 +31,13 @@ TASK_DBTDEPS, TASK_DBTRUN, TASK_DBTTEST, + TASK_DBTCLOUDRUN, ] # These are tasks to be run via deployment # Adding a new task here will work for any new orgtask created # But for the current ones a script would need to be run to set them with a deployment -LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST] +LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUDRUN] # airbyte sync timeout in deployment params AIRBYTE_SYNC_TIMEOUT = 15 diff --git a/seed/tasks.json b/seed/tasks.json index 134c45c0..ba3826a9 100644 --- a/seed/tasks.json +++ b/seed/tasks.json @@ -119,5 +119,16 @@ "command": null, "is_system": false } + }, + { + "model": "ddpui.Task", + "pk": 11, + "fields": { + "type": "dbtcloud", + "slug": "dbtcloud-run", + "label": "DBT Cloud run", + "command": "cloud-run", + "is_system": true + } } ] \ No newline at end of file From 8286122dcee91a79e737c5cc4cae64fb7101ef8c Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Sat, 14 Dec 2024 03:30:48 +0530 Subject: [PATCH 03/10] fix --- ddpui/settings.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ddpui/settings.py b/ddpui/settings.py index 019cc5de..940886a0 100644 --- a/ddpui/settings.py +++ b/ddpui/settings.py @@ -22,16 +22,16 @@ load_dotenv() -# sentry_sdk.init( -# dsn=os.getenv("SENTRY_DSN"), -# # Set traces_sample_rate to 1.0 to capture 100% -# # of transactions for performance monitoring. -# traces_sample_rate=float(os.getenv("SENTRY_TSR", "1.0")), -# # Set profiles_sample_rate to 1.0 to profile 100% -# # of sampled transactions. -# # We recommend adjusting this value in production. -# profiles_sample_rate=float(os.getenv("SENTRY_PSR", "1.0")), -# ) +sentry_sdk.init( + dsn=os.getenv("SENTRY_DSN"), + # Set traces_sample_rate to 1.0 to capture 100% + # of transactions for performance monitoring. + traces_sample_rate=float(os.getenv("SENTRY_TSR", "1.0")), + # Set profiles_sample_rate to 1.0 to profile 100% + # of sampled transactions. + # We recommend adjusting this value in production. + profiles_sample_rate=float(os.getenv("SENTRY_PSR", "1.0")), +) # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent From 8b837b1ebf28abe2870096595e8ce337ab1f9e65 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 16 Dec 2024 19:35:32 +0530 Subject: [PATCH 04/10] updates --- ddpui/api/orgtask_api.py | 61 ++++++++++++++++++--------------- ddpui/core/orgtaskfunctions.py | 21 ++++++++---- ddpui/core/pipelinefunctions.py | 14 ++++---- ddpui/ddpdbt/dbt_service.py | 4 +-- ddpui/ddpdbt/schema.py | 15 +++----- ddpui/ddpprefect/__init__.py | 5 +-- ddpui/ddpprefect/schema.py | 20 +++++------ ddpui/utils/constants.py | 6 ++-- seed/tasks.json | 8 ++--- 9 files changed, 81 insertions(+), 73 deletions(-) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 64c6718d..c7e49dc5 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -11,10 +11,7 @@ from ddpui.ddpprefect import prefect_service from ddpui.ddpairbyte import airbytehelpers -from ddpui.ddpprefect import ( - DBTCLIPROFILE, - SECRET, -) +from ddpui.ddpprefect import DBTCLIPROFILE, SECRET, DBTCLOUDCREDS from ddpui.models.org import ( Org, OrgWarehouse, @@ -31,7 +28,7 @@ from ddpui.ddpprefect.schema import ( PrefectSecretBlockCreate, ) -from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams +from ddpui.ddpdbt.schema import DbtProjectParams, DbtCloudJobParams from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters from ddpui.core.orgdbt_manager import DbtProjectManager @@ -90,37 +87,45 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): orgtask = OrgTask.objects.create( org=orguser.org, task=task, - parameters=parameters, # here the accountId, jobId and apiKey will be recieved for dbt-cloud + parameters=parameters, generated_by="client", uuid=uuid.uuid4(), ) dataflow = None - # For dbt-cli - if task.slug in LONG_RUNNING_TASKS and task.type is "dbt": - dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( - orguser.org, orgdbt - ) + if task.slug in LONG_RUNNING_TASKS: + # For dbt-cli + if task.type == "dbt": + dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + orguser.org, orgdbt + ) - # fetch the cli profile block - cli_profile_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() + # fetch the cli profile block + cli_profile_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() - if cli_profile_block is None: - raise HttpError(400, "dbt cli profile block not found") + if cli_profile_block is None: + raise HttpError(400, "dbt cli profile block not found") - dataflow = create_prefect_deployment_for_dbtcore_task( - orgtask, cli_profile_block, dbt_project_params - ) + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, cli_profile_block, dbt_project_params + ) - # For dbt-cloud - if task.slug in LONG_RUNNING_TASKS and task.type is "dbtcloud": - dbt_cloud_params: DbtCloudParams = parameters["options"] - dataflow = create_prefect_deployment_for_dbtcore_task( - orgtask, dbt_cloud_params # this will contain accountId, apikey, and jobId - ) - # if the task id dbtcloud run then create another dataflow createprfectdeployfor dbtcloud task (112, 114 but for cloud) + # For dbt-cloud + if task.type is "dbtcloud": + # fetch dbt cloud creds block + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + + if dbt_cloud_creds_block is None: + raise HttpError(400, "dbt cloud credentials block not found") + + dbt_cloud_params: DbtCloudJobParams = DbtCloudJobParams(**parameters["options"]) + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_creds_block, dbt_cloud_params + ) return { **model_to_dict(orgtask, fields=["parameters"]), @@ -335,7 +340,7 @@ def post_run_prefect_org_task( if orgdbt is None: raise HttpError(400, "dbt is not configured for this client") - dbt_project_params: DbtCliParams = DbtProjectManager.gather_dbt_project_params( + dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( orguser.org, orgdbt ) diff --git a/ddpui/core/orgtaskfunctions.py b/ddpui/core/orgtaskfunctions.py index 19e9124e..604e75b9 100644 --- a/ddpui/core/orgtaskfunctions.py +++ b/ddpui/core/orgtaskfunctions.py @@ -20,7 +20,7 @@ PrefectDataFlowCreateSchema3, ) from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE -from ddpui.ddpdbt.schema import DbtCliParams, DbtProjectParams +from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams from ddpui.ddpprefect import prefect_service from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR @@ -109,11 +109,14 @@ def get_edr_send_report_task(org: Org, **kwargs) -> OrgTask | None: def create_prefect_deployment_for_dbtcore_task( org_task: OrgTask, - cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtProjectParams, + credentials_profile_block: OrgPrefectBlockv1, + dbt_project_params: Union[DbtProjectParams, DbtCloudJobParams], ): """ - create a prefect deployment for a single dbt command and save the deployment id to an OrgDataFlowv1 object + - create a prefect deployment for a single dbt command or dbt cloud job + - save the deployment id to an OrgDataFlowv1 object + - for dbt core operation; the credentials_profile_block is cli profile block + - for dbt cloud job; the credentials_profile_block is dbt cloud credentials block """ hash_code = generate_hash_id(8) deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}" @@ -121,10 +124,16 @@ def create_prefect_deployment_for_dbtcore_task( tasks = [] if org_task.task.type == "dbt": tasks = [ - setup_dbt_core_task_config(org_task, cli_profile_block, dbt_project_params).to_json() + setup_dbt_core_task_config( + org_task, credentials_profile_block, dbt_project_params + ).to_json() ] elif org_task.task.type == "dbtcloud": - tasks = [setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json()] + tasks = [ + setup_dbt_cloud_task_config( + org_task, credentials_profile_block, dbt_project_params + ).to_json() + ] dataflow = prefect_service.create_dataflow_v1( PrefectDataFlowCreateSchema3( diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 7a84f737..26fa0ad0 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -23,7 +23,7 @@ ) from ddpui.ddpprefect import ( AIRBYTECONNECTION, - DBTCLOUD, + DBTCLOUDJOB, DBTCORE, SECRET, SHELLOPERATION, @@ -41,7 +41,7 @@ UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, ) -from ddpui.ddpdbt.schema import DbtCliParams, DbtCloudParams, DbtProjectParams +from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams logger = CustomLogger("ddpui") @@ -106,17 +106,17 @@ def setup_dbt_core_task_config( def setup_dbt_cloud_task_config( org_task: OrgTask, - dbt_project_params: DbtCloudParams, + cloud_creds_block: OrgPrefectBlockv1, + dbt_project_params: DbtCloudJobParams, seq: int = 1, ): """constructs the prefect payload for a dbt-cloud job""" return PrefectDbtCloudTaskSetup( seq=seq, slug=org_task.task.slug, - type=DBTCLOUD, - api_key=dbt_project_params.api_key, - account_id=dbt_project_params.account_id, - job_id=dbt_project_params.job_id, + type=DBTCLOUDJOB, + dbt_cloud_job_id=dbt_project_params.job_id, + dbt_cloud_creds_block=cloud_creds_block.block_name, orgtask_uuid=str(org_task.uuid), ) diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index ad372b11..45f570ee 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -30,7 +30,7 @@ TASK_DBTRUN, TASK_DBTSEED, TASK_DBTDEPS, - TASK_DBTCLOUDRUN, + TASK_DBTCLOUD_JOB, ) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.utils.timezone import as_ist @@ -108,7 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, - TASK_DBTCLOUDRUN: {"flags": [], "options": ["account_id", "api-key", "job-id"]}, + TASK_DBTCLOUD_JOB: {"flags": [], "options": ["job-id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index efa8e901..ce8cf9d1 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -1,11 +1,11 @@ -from typing import Literal, Optional, Union +from typing import Union from ninja import Schema from pathlib import Path -class DbtCliParams(Schema): +class DbtProjectParams(Schema): """ - Schema to define all parameters required to run a dbt project using CLI. + schema to define all parameters required to run a dbt project """ dbt_env_dir: Union[str, Path] @@ -16,14 +16,9 @@ class DbtCliParams(Schema): dbt_binary: Union[str, Path] -class DbtCloudParams(Schema): +class DbtCloudJobParams(Schema): """ - Schema to define all parameters required to run a dbt project using dbt Cloud. + Schema to define all parameters required to run a any dbt command using dbt Cloud. """ - api_key: str - account_id: int job_id: int - - -DbtProjectParams = Union[DbtCliParams, DbtCloudParams] diff --git a/ddpui/ddpprefect/__init__.py b/ddpui/ddpprefect/__init__.py index ad14ba4b..eb8d3c0a 100644 --- a/ddpui/ddpprefect/__init__.py +++ b/ddpui/ddpprefect/__init__.py @@ -1,9 +1,10 @@ -# prefect block names +# prefect block names (they come from prefect's block_type table) AIRBYTESERVER = "Airbyte Server" AIRBYTECONNECTION = "Airbyte Connection" SHELLOPERATION = "Shell Operation" DBTCORE = "dbt Core Operation" -DBTCLOUD = "dbt Cloud Operation" +DBTCLOUDJOB = "dbt Cloud Job" +DBTCLOUDCREDS = "dbt Cloud Credentials" DBTCLIPROFILE = "dbt CLI Profile" SECRET = "Secret" diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index aca87272..6f8f834a 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -162,24 +162,22 @@ def to_json(self): class PrefectDbtCloudTaskSetup(Schema): "request payload to trigger a dbt cloud run task in prefect" - seq = int = (0,) - slug = (str,) - type = (str,) - api_key = (str,) - account_id = (str,) - job_id = (str,) - orgtask_uuid = (str,) + type: str + slug: str + dbt_cloud_job_id: int + dbt_cloud_creds_block: str + orgtask_uuid = str + seq: int = 0 def to_json(self): """JSON serialization""" return { - "seq": self.seq, "slug": self.slug, "type": self.type, - "api_key": self.api_key, - "account_id": self.account_id, - "job_id": self.job_id, + "dbt_cloud_job_id": self.dbt_cloud_job_id, + "dbt_cloud_creds_block": self.dbt_cloud_creds_block, "orgtask_uuid": self.orgtask_uuid, + "seq": self.seq, } diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 3538beea..4feecb9b 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -4,7 +4,7 @@ TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" TASK_GITPULL = ("git-pull",) -TASK_DBTCLOUDRUN = "dbtcloud-run" # this is task slug so it should match the seed data. +TASK_DBTCLOUD_JOB = "dbt-cloud-job" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync" TASK_AIRBYTERESET = "airbyte-reset" @@ -31,13 +31,13 @@ TASK_DBTDEPS, TASK_DBTRUN, TASK_DBTTEST, - TASK_DBTCLOUDRUN, + TASK_DBTCLOUD_JOB, ] # These are tasks to be run via deployment # Adding a new task here will work for any new orgtask created # But for the current ones a script would need to be run to set them with a deployment -LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUDRUN] +LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUD_JOB] # airbyte sync timeout in deployment params AIRBYTE_SYNC_TIMEOUT = 15 diff --git a/seed/tasks.json b/seed/tasks.json index ba3826a9..287aeb50 100644 --- a/seed/tasks.json +++ b/seed/tasks.json @@ -122,12 +122,12 @@ }, { "model": "ddpui.Task", - "pk": 11, + "pk": 12, "fields": { "type": "dbtcloud", - "slug": "dbtcloud-run", - "label": "DBT Cloud run", - "command": "cloud-run", + "slug": "dbt-cloud-job", + "label": "DBT Cloud job", + "command": null, "is_system": true } } From fb82a8bf0ad545c3b96f62cef5adb1985b71819e Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 16 Dec 2024 19:37:00 +0530 Subject: [PATCH 05/10] minor --- ddpui/ddpdbt/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index ce8cf9d1..67d2bd44 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -19,6 +19,7 @@ class DbtProjectParams(Schema): class DbtCloudJobParams(Schema): """ Schema to define all parameters required to run a any dbt command using dbt Cloud. + Extend this if you need to add more params while triggering a dbt cloud job """ job_id: int From a4052c962adda8c7fc798c58d408f04311a6bc85 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 16 Dec 2024 21:30:08 +0530 Subject: [PATCH 06/10] script / helpers to create --- ddpui/api/pipeline_api.py | 4 +- ddpui/core/pipelinefunctions.py | 2 +- ddpui/ddpprefect/prefect_service.py | 66 ++++++++++++++++++- .../commands/dbt_cloud_integration.py | 41 ++++++++++++ 4 files changed, 109 insertions(+), 4 deletions(-) create mode 100644 ddpui/management/commands/dbt_cloud_integration.py diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 0f578f9a..e6309ad8 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -24,7 +24,7 @@ from ddpui.utils.constants import TASK_DBTRUN, TASK_AIRBYTESYNC from ddpui.utils.custom_logger import CustomLogger from ddpui.schemas.org_task_schema import TaskParameters -from ddpui.ddpdbt.schema import DbtProjectParams, DbtCloudParams +from ddpui.ddpdbt.schema import DbtProjectParams from ddpui.utils.prefectlogs import parse_prefect_logs from ddpui.utils.helpers import generate_hash_id from ddpui.core.pipelinefunctions import ( @@ -117,7 +117,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if dbt_task_parameters is None: raise HttpError(400, "dbt cloud task parameters not found") - dbt_project_params: DbtCloudParams = dbt_task_parameters[ + dbt_project_params: DbtProjectParams = dbt_task_parameters[ "object" ] # this contains account_key, api_key and job_id diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 26fa0ad0..44e5810f 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -85,7 +85,7 @@ def setup_airbyte_update_schema_task_config( def setup_dbt_core_task_config( org_task: OrgTask, cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtCliParams, + dbt_project_params: DbtProjectParams, seq: int = 1, ): """constructs the prefect payload for a dbt job""" diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 420f691b..d2aee5e7 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -18,13 +18,15 @@ ) from ddpui.utils.custom_logger import CustomLogger from ddpui.models.tasks import DataflowOrgTask, TaskLock -from ddpui.models.org_user import OrgUser +from ddpui.models.org_user import OrgUser, Org +from ddpui.models.org import OrgPrefectBlockv1 from ddpui.models.flow_runs import PrefectFlowRun from ddpui.ddpprefect import ( DDP_WORK_QUEUE, FLOW_RUN_COMPLETED_STATE_TYPE, FLOW_RUN_CRASHED_STATE_TYPE, FLOW_RUN_FAILED_STATE_TYPE, + DBTCLOUDCREDS, ) from ddpui.utils.constants import ( FLOW_RUN_LOGS_OFFSET_LIMIT, @@ -113,6 +115,31 @@ def prefect_put(endpoint: str, json: dict, **kwargs) -> dict: return res.json() +def prefect_patch(endpoint: str, json: dict, **kwargs) -> dict: + """make a PATCH request to the proxy""" + # we send headers and timeout separately from kwargs, just to be explicit about it + headers = kwargs.pop("headers", {}) + headers["x-ddp-org"] = logger.get_slug() + timeout = kwargs.pop("timeout", http_timeout) + + try: + res = requests.patch( + f"{PREFECT_PROXY_API_URL}/proxy/{endpoint}", + headers=headers, + timeout=timeout, + json=json, + **kwargs, + ) + except Exception as error: + raise HttpError(500, "connection error") from error + try: + res.raise_for_status() + except Exception as error: + logger.exception(error) + raise HttpError(res.status_code, res.text) from error + return res.json() + + def prefect_delete_a_block(block_id: str, **kwargs) -> None: """makes a DELETE request to the proxy""" # we send headers and timeout separately from kwargs, just to be explicit about it @@ -669,3 +696,40 @@ def get_prefect_version(): """Fetch secret block id and block name""" response = prefect_get("prefect/version") return response + + +def upsert_dbt_cloud_creds_block(block_name: str, account_id: int, api_key: str) -> dict: + """Create a dbt cloud creds block in prefect; using patch style create or udpate""" + response = prefect_patch( + "blocks/dbtcloudcreds/", + {"block_name": block_name, "account_id": account_id, "api_key": api_key}, + ) + return response + + +def create_or_update_dbt_cloud_creds_block( + org: Org, + account_id: int, + api_key: str, +) -> OrgPrefectBlockv1: + """Create a dbt cli profile block in that has the warehouse information""" + cloud_creds_block = OrgPrefectBlockv1.objects.filter(org=org, block_type=DBTCLOUDCREDS).first() + block_name = None + + if not cloud_creds_block: + block_name = f"{org.slug}-dbtcloud-creds" + cloud_creds_block = OrgPrefectBlockv1( + org=org, + block_type=DBTCLOUDCREDS, + block_name=block_name, + ) + else: + block_name = cloud_creds_block.block_name + + result = upsert_dbt_cloud_creds_block(block_name, account_id, api_key) + + cloud_creds_block.block_id = result["block_id"] + cloud_creds_block.block_name = result["block_name"] + cloud_creds_block.save() + + return cloud_creds_block diff --git a/ddpui/management/commands/dbt_cloud_integration.py b/ddpui/management/commands/dbt_cloud_integration.py new file mode 100644 index 00000000..60a76d36 --- /dev/null +++ b/ddpui/management/commands/dbt_cloud_integration.py @@ -0,0 +1,41 @@ +from django.core.management.base import BaseCommand + +from ddpui.models.org import Org, OrgPrefectBlockv1 +from ddpui.models.org import OrgWarehouse +from ddpui.utils import secretsmanager +from ddpui.ddpprefect import DBTCLIPROFILE +from ddpui.ddpprefect import prefect_service + + +class Command(BaseCommand): + """ + This script lets us run tasks/jobs related to dbt cloud integrations in dalgo + """ + + help = "Dbt cloud related tasks" + + def add_arguments(self, parser): + parser.add_argument("org", type=str, help="Org slug") + parser.add_argument( + "--api-key", type=str, help="Api key for your dbt cloud account", required=True + ) + parser.add_argument( + "--account-id", type=int, help="Account id for your dbt cloud account", required=True + ) + + def handle(self, *args, **options): + """ + Create/update dbt cloud creds block + This should be replaced by configuration option on settings panel where users can add this + """ + org = Org.objects.filter(slug=options["org"]).first() + if org is None: + print(f"Org with slug {options['org']} does not exist") + return + + if options["api_key"] and options["account_id"]: + block: OrgPrefectBlockv1 = prefect_service.create_or_update_dbt_cloud_creds_block( + org, options["account_id"], options["api_key"] + ) + print("DBT Cloud credentials block created/updated %s", block.block_name) + return From 1f88dcc4d7512c82dccf8713231bd2d89456ebe2 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 16 Dec 2024 23:29:43 +0530 Subject: [PATCH 07/10] updated and test orgtask creation flow from transform page --- ddpui/api/data_api.py | 2 +- ddpui/api/orgtask_api.py | 27 +++++++++++++++++++-------- ddpui/core/pipelinefunctions.py | 1 - ddpui/ddpdbt/dbt_service.py | 2 +- ddpui/ddpprefect/prefect_service.py | 1 + ddpui/ddpprefect/schema.py | 10 +++++----- ddpui/utils/constants.py | 1 + 7 files changed, 28 insertions(+), 16 deletions(-) diff --git a/ddpui/api/data_api.py b/ddpui/api/data_api.py index 17d2dbdc..83605d39 100644 --- a/ddpui/api/data_api.py +++ b/ddpui/api/data_api.py @@ -25,7 +25,7 @@ def get_tasks(request): """Fetch master list of tasks related to transformation""" tasks = [ model_to_dict(task, exclude=["id"]) - for task in Task.objects.filter(type__in=["dbt", "git"]).all() + for task in Task.objects.filter(type__in=["dbt", "git", "dbtcloud"]).all() ] return tasks diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index c7e49dc5..d1caebdb 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -113,7 +113,7 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): ) # For dbt-cloud - if task.type is "dbtcloud": + if task.type == "dbtcloud": # fetch dbt cloud creds block dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( org=orguser.org, block_type=DBTCLOUDCREDS @@ -122,10 +122,19 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): if dbt_cloud_creds_block is None: raise HttpError(400, "dbt cloud credentials block not found") - dbt_cloud_params: DbtCloudJobParams = DbtCloudJobParams(**parameters["options"]) - dataflow = create_prefect_deployment_for_dbtcore_task( - orgtask, dbt_cloud_creds_block, dbt_cloud_params - ) + try: + dbt_cloud_params = DbtCloudJobParams(**parameters["options"]) + except Exception as error: + logger.exception(error) + raise HttpError(400, "Job id should be numeric") from error + + try: + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_creds_block, dbt_cloud_params + ) + except Exception as error: + logger.exception(error) + raise HttpError(400, "failed to create dbt cloud deployment") from error return { **model_to_dict(orgtask, fields=["parameters"]), @@ -222,7 +231,7 @@ def get_prefect_transformation_tasks(request): org_tasks = ( OrgTask.objects.filter( org=orguser.org, - task__type__in=["git", "dbt"], + task__type__in=["git", "dbt", "dbtcloud"], ) .order_by("-generated_by") .select_related("task") @@ -240,7 +249,9 @@ def get_prefect_transformation_tasks(request): for org_task in org_tasks: # git pull : "git" + " " + "pull" # dbt run --full-refresh : "dbt" + " " + "run --full-refresh" - command = org_task.task.type + " " + org_task.get_task_parameters() + command = None + if org_task.task.type != "dbtcloud": + command = org_task.task.type + " " + org_task.get_task_parameters() lock = None all_locks = [lock for lock in all_org_task_locks if lock.orgtask_id == org_task.id] @@ -446,7 +457,7 @@ def post_delete_orgtask(request, orgtask_uuid): # pylint: disable=unused-argume if org_task is None: raise HttpError(400, "task not found") - if org_task.task.type not in ["dbt", "git", "edr"]: + if org_task.task.type not in ["dbt", "git", "edr", "dbtcloud"]: raise HttpError(400, "task not supported") if orguser.org.dbt is None: diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 44e5810f..1fd15ccf 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -3,7 +3,6 @@ do not raise http errors here """ -from pathlib import Path from typing import Union from functools import cmp_to_key from django.db import transaction diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index 45f570ee..f0ed60bc 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -108,7 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, - TASK_DBTCLOUD_JOB: {"flags": [], "options": ["job-id"]}, + TASK_DBTCLOUD_JOB: {"flags": [], "options": ["job_id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index d2aee5e7..ecccaba9 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -81,6 +81,7 @@ def prefect_post(endpoint: str, json: dict, **kwargs) -> dict: **kwargs, ) except Exception as error: + logger.error(error) raise HttpError(500, "connection error") from error try: res.raise_for_status() diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index 6f8f834a..0c55ddd9 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -1,7 +1,6 @@ -from typing import List, Optional +from typing import Optional from ninja import Schema -from pydantic import Field class PrefectAirbyteSync(Schema): @@ -161,19 +160,20 @@ def to_json(self): class PrefectDbtCloudTaskSetup(Schema): - "request payload to trigger a dbt cloud run task in prefect" + """request payload to trigger a dbt cloud run task in prefect""" + type: str slug: str dbt_cloud_job_id: int dbt_cloud_creds_block: str - orgtask_uuid = str + orgtask_uuid: str seq: int = 0 def to_json(self): """JSON serialization""" return { - "slug": self.slug, "type": self.type, + "slug": self.slug, "dbt_cloud_job_id": self.dbt_cloud_job_id, "dbt_cloud_creds_block": self.dbt_cloud_creds_block, "orgtask_uuid": self.orgtask_uuid, diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 4feecb9b..7ba37286 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -23,6 +23,7 @@ TASK_DBTTEST: 6, TASK_DOCSGENERATE: 7, TASK_GENERATE_EDR: 8, + TASK_DBTCLOUD_JOB: 20, } # when a new pipeline is created; these are the transform tasks being pushed by default DEFAULT_TRANSFORM_TASKS_IN_PIPELINE = [ From f152b08ef2db156b152787344ccc326e4f79e333 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 17 Dec 2024 00:24:16 +0530 Subject: [PATCH 08/10] pipeline creation/editing with dbt cloud tasks --- ddpui/api/pipeline_api.py | 115 ++++++++++++++++---------------- ddpui/core/pipelinefunctions.py | 8 ++- ddpui/ddpprefect/schema.py | 1 - 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index e6309ad8..5a7fe08d 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -7,10 +7,7 @@ from ddpui.ddpprefect import prefect_service from ddpui.ddpairbyte import airbyte_service -from ddpui.ddpprefect import ( - DBTCLIPROFILE, - AIRBYTESERVER, -) +from ddpui.ddpprefect import DBTCLIPROFILE, AIRBYTESERVER, DBTCLOUDCREDS from ddpui.models.org import OrgDataFlowv1, OrgPrefectBlockv1 from ddpui.models.org_user import OrgUser from ddpui.models.tasks import DataflowOrgTask, OrgTask @@ -54,9 +51,6 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if payload.name in [None, ""]: raise HttpError(400, "must provide a name for the flow") - if payload.alignment is None: - raise HttpError(400, "must provide alignment type i.e Simple, Advanced or DBT Cloud") - tasks = [] # This is main task array- containing airbyte and dbt task both. map_org_tasks = [] # seq of org tasks to be mapped in pipelin/ dataflow @@ -104,48 +98,16 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): # push dbt pipeline orgtasks dbt_project_params: DbtProjectParams = None dbt_git_orgtasks = [] + dbt_cloud_orgtasks = [] orgdbt = orguser.org.dbt - if ( - payload.transformTasks - and len(payload.transformTasks) > 0 - and payload.alignment == "DBT Cloud" - ): - # get the deployment task configs - dbt_task_parameters = OrgTask.objects.filter(org=orguser.org).first() - - if dbt_task_parameters is None: - raise HttpError(400, "dbt cloud task parameters not found") - - dbt_project_params: DbtProjectParams = dbt_task_parameters[ - "object" - ] # this contains account_key, api_key and job_id - - task_configs, error = pipeline_with_orgtasks( - orguser.org, - dbt_project_params=dbt_project_params, - start_seq=len(tasks), - ) - if error: - raise HttpError(400, error) - tasks += task_configs - - elif ( - payload.transformTasks and len(payload.transformTasks) > 0 ## For dbt cli + payload.transformTasks and len(payload.transformTasks) > 0 ## For dbt cli & dbt cloud ): # dont modify this block as its of rlocal logger.info("Dbt tasks being pushed to the pipeline") # dbt params - dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt) - # dbt cli profile block - cli_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() - if not cli_block: - raise HttpError(400, "dbt cli profile not found") - payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq for transform_task in payload.transformTasks: @@ -154,22 +116,47 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): logger.error(f"org task with {transform_task.uuid} not found") continue - # map this org task to dataflow - dbt_git_orgtasks.append(org_task) + if org_task.task.type in ["dbt", "git"]: + dbt_git_orgtasks.append(org_task) + elif org_task.task.type == "dbtcloud": + dbt_cloud_orgtasks.append(org_task) + + logger.info(f"{len(dbt_git_orgtasks)} Git/Dbt cli tasks being pushed to the pipeline") + logger.info(f"{len(dbt_cloud_orgtasks)} Dbt cloud tasks being pushed to the pipeline") + + # dbt cli profile block + cli_block = None + if len(dbt_git_orgtasks) > 0: + cli_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() + if not cli_block: + raise HttpError(400, "dbt cli profile not found") + + # dbt cloud creds block + dbt_cloud_creds_block = None + if len(dbt_cloud_orgtasks) > 0: + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + if not dbt_cloud_creds_block: + raise HttpError(400, "dbt cloud creds block not found") # get the deployment task configs task_configs, error = pipeline_with_orgtasks( orguser.org, - dbt_git_orgtasks, + dbt_git_orgtasks + dbt_cloud_orgtasks, cli_block=cli_block, dbt_project_params=dbt_project_params, start_seq=len(tasks), + dbt_cloud_creds_block=dbt_cloud_creds_block, ) if error: raise HttpError(400, error) tasks += task_configs map_org_tasks += dbt_git_orgtasks + map_org_tasks += dbt_cloud_orgtasks # create deployment try: @@ -321,7 +308,7 @@ def get_prefect_dataflow_v1(request, deployment_id): transform_tasks = [ {"uuid": dataflow_orgtask.orgtask.uuid, "seq": dataflow_orgtask.seq} for dataflow_orgtask in DataflowOrgTask.objects.filter( - dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt"] + dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt", "dbtcloud"] ) .all() .order_by("seq") @@ -431,6 +418,7 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda # push dbt pipeline orgtasks dbt_project_params = None dbt_git_orgtasks = [] + dbt_cloud_orgtasks = [] orgdbt = orguser.org.dbt if payload.transformTasks and len(payload.transformTasks) > 0: logger.info(f"Dbt tasks being pushed to the pipeline") @@ -440,13 +428,6 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda orguser.org, orgdbt ) - # dbt cli profile block - cli_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() - if not cli_block: - raise HttpError(400, "dbt cli profile not found") - payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq for transform_task in payload.transformTasks: @@ -455,24 +436,44 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda logger.error(f"org task with {transform_task.uuid} not found") continue - # map this org task to dataflow - dbt_git_orgtasks.append(org_task) + if org_task.task.type in ["dbt", "git"]: + dbt_git_orgtasks.append(org_task) + elif org_task.task.type == "dbtcloud": + dbt_cloud_orgtasks.append(org_task) + + # dbt cli profile block + cli_block = None + if len(dbt_git_orgtasks) > 0: + cli_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() + if not cli_block: + raise HttpError(400, "dbt cli profile not found") + + # dbt cloud creds block + dbt_cloud_creds_block = None + if len(dbt_cloud_orgtasks) > 0: + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + if not dbt_cloud_creds_block: + raise HttpError(400, "dbt cloud creds block not found") # get the deployment task configs task_configs, error = pipeline_with_orgtasks( orguser.org, - dbt_git_orgtasks, + dbt_git_orgtasks + dbt_cloud_orgtasks, cli_block=cli_block, dbt_project_params=dbt_project_params, start_seq=len(tasks), + dbt_cloud_creds_block=dbt_cloud_creds_block, ) - logger.info("HERE") - logger.info(task_configs) if error: raise HttpError(400, error) tasks += task_configs map_org_tasks += dbt_git_orgtasks + map_org_tasks += dbt_cloud_orgtasks # update deployment payload.deployment_params = {"config": {"tasks": tasks, "org_slug": orguser.org.slug}} diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 1fd15ccf..46662b8d 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -39,6 +39,7 @@ TASK_AIRBYTERESET, UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, + TASK_DBTCLOUD_JOB, ) from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams @@ -169,6 +170,7 @@ def pipeline_with_orgtasks( cli_block: OrgPrefectBlockv1 = None, dbt_project_params: DbtProjectParams = None, start_seq: int = 0, + dbt_cloud_creds_block: OrgPrefectBlockv1 = None, ): """ Returns a list of task configs for a pipeline; @@ -200,8 +202,10 @@ def pipeline_with_orgtasks( dbt_project_params.project_dir, dbt_project_params.venv_binary, ).to_json() - elif dbt_project_params.get("job_id") is not None: # its dbt cloud task - task_config = setup_dbt_cloud_task_config(org_task, dbt_project_params).to_json() + elif org_task.task.slug == TASK_DBTCLOUD_JOB: + task_config = setup_dbt_cloud_task_config( + org_task, dbt_cloud_creds_block, DbtCloudJobParams(**org_task.options()) + ).to_json() else: task_config = setup_dbt_core_task_config( org_task, cli_block, dbt_project_params diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index 0c55ddd9..2c29da55 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -326,7 +326,6 @@ class PrefectDataFlowCreateSchema4(Schema): name: str connections: list[PrefectFlowAirbyteConnection2] cron: str - alignment: str # Simple, Advanced and DBTCloud transformTasks: list[PrefectDataFlowOrgTasks] From f40974d2be829ded7cad0ec5d62b2455264849ef Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 17 Dec 2024 00:31:21 +0530 Subject: [PATCH 09/10] test fixes --- ddpui/tests/api_tests/test_orgtask_api.py | 2 +- ddpui/tests/api_tests/test_pipeline_api.py | 2 +- ddpui/tests/core/test_orgtaskfunctions.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddpui/tests/api_tests/test_orgtask_api.py b/ddpui/tests/api_tests/test_orgtask_api.py index 43138d69..9ce3b9f8 100644 --- a/ddpui/tests/api_tests/test_orgtask_api.py +++ b/ddpui/tests/api_tests/test_orgtask_api.py @@ -215,7 +215,7 @@ def test_seed_data(seed_db): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ diff --git a/ddpui/tests/api_tests/test_pipeline_api.py b/ddpui/tests/api_tests/test_pipeline_api.py index f8159051..8781c0ed 100644 --- a/ddpui/tests/api_tests/test_pipeline_api.py +++ b/ddpui/tests/api_tests/test_pipeline_api.py @@ -245,7 +245,7 @@ def test_seed_data(seed_db): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ diff --git a/ddpui/tests/core/test_orgtaskfunctions.py b/ddpui/tests/core/test_orgtaskfunctions.py index 012e1dc1..47e60a14 100644 --- a/ddpui/tests/core/test_orgtaskfunctions.py +++ b/ddpui/tests/core/test_orgtaskfunctions.py @@ -34,7 +34,7 @@ def seed_master_tasks_db(django_db_setup, django_db_blocker): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ From 28466011925a9a78e006a0751a74e4fe4ebce6d7 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 17 Dec 2024 00:35:50 +0530 Subject: [PATCH 10/10] fixes --- ddpui/tests/api_tests/test_pipeline_api.py | 5 ----- ddpui/utils/constants.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/ddpui/tests/api_tests/test_pipeline_api.py b/ddpui/tests/api_tests/test_pipeline_api.py index 8781c0ed..dac7f631 100644 --- a/ddpui/tests/api_tests/test_pipeline_api.py +++ b/ddpui/tests/api_tests/test_pipeline_api.py @@ -258,7 +258,6 @@ def test_post_prefect_dataflow_v1_failure1(orguser): name="test-dataflow", connections=connections, cron="", - alignment="simple", transformTasks=[], ) orguser.org = None @@ -278,7 +277,6 @@ def test_post_prefect_dataflow_v1_failure2(orguser_transform_tasks): connections=connections, cron="", transformTasks=[], - alignment="simple", ) request = mock_request(orguser_transform_tasks) @@ -314,7 +312,6 @@ def test_post_prefect_dataflow_v1_success(orguser_transform_tasks): connections=connections, cron="test-cron", transformTasks=[], - alignment="simple", ) deployment = post_prefect_dataflow_v1(request, payload) @@ -363,7 +360,6 @@ def test_post_prefect_dataflow_v1_success2(orguser_transform_tasks): name="test-dataflow", connections=connections, cron="test-cron", - alignment="simple", transformTasks=[ PrefectDataFlowOrgTasks(uuid=str(org_task.uuid), seq=idx) for idx, org_task in enumerate(transform_tasks) @@ -619,7 +615,6 @@ def test_get_prefect_dataflow_v1_success(orguser_transform_tasks): name="test-dataflow", connections=connections, cron="test-cron", - alignment="simple", transformTasks=[ PrefectDataFlowOrgTasks(uuid=str(org_task.uuid), seq=idx) for idx, org_task in enumerate(transform_tasks) diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 7ba37286..56c9e39e 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -3,7 +3,7 @@ TASK_DBTTEST = "dbt-test" TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" -TASK_GITPULL = ("git-pull",) +TASK_GITPULL = "git-pull" TASK_DBTCLOUD_JOB = "dbt-cloud-job" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync"