diff --git a/ddpui/api/data_api.py b/ddpui/api/data_api.py index 17d2dbdc..83605d39 100644 --- a/ddpui/api/data_api.py +++ b/ddpui/api/data_api.py @@ -25,7 +25,7 @@ def get_tasks(request): """Fetch master list of tasks related to transformation""" tasks = [ model_to_dict(task, exclude=["id"]) - for task in Task.objects.filter(type__in=["dbt", "git"]).all() + for task in Task.objects.filter(type__in=["dbt", "git", "dbtcloud"]).all() ] return tasks diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 8e5c06a3..d1caebdb 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -11,10 +11,7 @@ from ddpui.ddpprefect import prefect_service from ddpui.ddpairbyte import airbytehelpers -from ddpui.ddpprefect import ( - DBTCLIPROFILE, - SECRET, -) +from ddpui.ddpprefect import DBTCLIPROFILE, SECRET, DBTCLOUDCREDS from ddpui.models.org import ( Org, OrgWarehouse, @@ -31,7 +28,7 @@ from ddpui.ddpprefect.schema import ( PrefectSecretBlockCreate, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtProjectParams, DbtCloudJobParams from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters from ddpui.core.orgdbt_manager import DbtProjectManager @@ -97,21 +94,47 @@ def post_orgtask(request, payload: CreateOrgTaskPayload): dataflow = None if task.slug in LONG_RUNNING_TASKS: - dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( - orguser.org, orgdbt - ) + # For dbt-cli + if task.type == "dbt": + dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params( + orguser.org, orgdbt + ) - # fetch the cli profile block - cli_profile_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() + # fetch the cli profile block + cli_profile_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() - if cli_profile_block is None: - raise HttpError(400, "dbt cli profile block not found") + if cli_profile_block is None: + raise HttpError(400, "dbt cli profile block not found") - dataflow = create_prefect_deployment_for_dbtcore_task( - orgtask, cli_profile_block, dbt_project_params - ) + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, cli_profile_block, dbt_project_params + ) + + # For dbt-cloud + if task.type == "dbtcloud": + # fetch dbt cloud creds block + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + + if dbt_cloud_creds_block is None: + raise HttpError(400, "dbt cloud credentials block not found") + + try: + dbt_cloud_params = DbtCloudJobParams(**parameters["options"]) + except Exception as error: + logger.exception(error) + raise HttpError(400, "Job id should be numeric") from error + + try: + dataflow = create_prefect_deployment_for_dbtcore_task( + orgtask, dbt_cloud_creds_block, dbt_cloud_params + ) + except Exception as error: + logger.exception(error) + raise HttpError(400, "failed to create dbt cloud deployment") from error return { **model_to_dict(orgtask, fields=["parameters"]), @@ -208,7 +231,7 @@ def get_prefect_transformation_tasks(request): org_tasks = ( OrgTask.objects.filter( org=orguser.org, - task__type__in=["git", "dbt"], + task__type__in=["git", "dbt", "dbtcloud"], ) .order_by("-generated_by") .select_related("task") @@ -226,7 +249,9 @@ def get_prefect_transformation_tasks(request): for org_task in org_tasks: # git pull : "git" + " " + "pull" # dbt run --full-refresh : "dbt" + " " + "run --full-refresh" - command = org_task.task.type + " " + org_task.get_task_parameters() + command = None + if org_task.task.type != "dbtcloud": + command = org_task.task.type + " " + org_task.get_task_parameters() lock = None all_locks = [lock for lock in all_org_task_locks if lock.orgtask_id == org_task.id] @@ -432,7 +457,7 @@ def post_delete_orgtask(request, orgtask_uuid): # pylint: disable=unused-argume if org_task is None: raise HttpError(400, "task not found") - if org_task.task.type not in ["dbt", "git", "edr"]: + if org_task.task.type not in ["dbt", "git", "edr", "dbtcloud"]: raise HttpError(400, "task not supported") if orguser.org.dbt is None: diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 45b8e346..5a7fe08d 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -7,10 +7,7 @@ from ddpui.ddpprefect import prefect_service from ddpui.ddpairbyte import airbyte_service -from ddpui.ddpprefect import ( - DBTCLIPROFILE, - AIRBYTESERVER, -) +from ddpui.ddpprefect import DBTCLIPROFILE, AIRBYTESERVER, DBTCLOUDCREDS from ddpui.models.org import OrgDataFlowv1, OrgPrefectBlockv1 from ddpui.models.org_user import OrgUser from ddpui.models.tasks import DataflowOrgTask, OrgTask @@ -54,7 +51,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): if payload.name in [None, ""]: raise HttpError(400, "must provide a name for the flow") - tasks = [] + tasks = [] # This is main task array- containing airbyte and dbt task both. map_org_tasks = [] # seq of org tasks to be mapped in pipelin/ dataflow # push conection orgtasks in pipelin @@ -101,20 +98,16 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): # push dbt pipeline orgtasks dbt_project_params: DbtProjectParams = None dbt_git_orgtasks = [] + dbt_cloud_orgtasks = [] orgdbt = orguser.org.dbt - if payload.transformTasks and len(payload.transformTasks) > 0: + if ( + payload.transformTasks and len(payload.transformTasks) > 0 ## For dbt cli & dbt cloud + ): # dont modify this block as its of rlocal logger.info("Dbt tasks being pushed to the pipeline") # dbt params dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt) - # dbt cli profile block - cli_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() - if not cli_block: - raise HttpError(400, "dbt cli profile not found") - payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq for transform_task in payload.transformTasks: @@ -123,22 +116,47 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4): logger.error(f"org task with {transform_task.uuid} not found") continue - # map this org task to dataflow - dbt_git_orgtasks.append(org_task) + if org_task.task.type in ["dbt", "git"]: + dbt_git_orgtasks.append(org_task) + elif org_task.task.type == "dbtcloud": + dbt_cloud_orgtasks.append(org_task) + + logger.info(f"{len(dbt_git_orgtasks)} Git/Dbt cli tasks being pushed to the pipeline") + logger.info(f"{len(dbt_cloud_orgtasks)} Dbt cloud tasks being pushed to the pipeline") + + # dbt cli profile block + cli_block = None + if len(dbt_git_orgtasks) > 0: + cli_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() + if not cli_block: + raise HttpError(400, "dbt cli profile not found") + + # dbt cloud creds block + dbt_cloud_creds_block = None + if len(dbt_cloud_orgtasks) > 0: + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + if not dbt_cloud_creds_block: + raise HttpError(400, "dbt cloud creds block not found") # get the deployment task configs task_configs, error = pipeline_with_orgtasks( orguser.org, - dbt_git_orgtasks, + dbt_git_orgtasks + dbt_cloud_orgtasks, cli_block=cli_block, dbt_project_params=dbt_project_params, start_seq=len(tasks), + dbt_cloud_creds_block=dbt_cloud_creds_block, ) if error: raise HttpError(400, error) tasks += task_configs map_org_tasks += dbt_git_orgtasks + map_org_tasks += dbt_cloud_orgtasks # create deployment try: @@ -290,7 +308,7 @@ def get_prefect_dataflow_v1(request, deployment_id): transform_tasks = [ {"uuid": dataflow_orgtask.orgtask.uuid, "seq": dataflow_orgtask.seq} for dataflow_orgtask in DataflowOrgTask.objects.filter( - dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt"] + dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt", "dbtcloud"] ) .all() .order_by("seq") @@ -400,6 +418,7 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda # push dbt pipeline orgtasks dbt_project_params = None dbt_git_orgtasks = [] + dbt_cloud_orgtasks = [] orgdbt = orguser.org.dbt if payload.transformTasks and len(payload.transformTasks) > 0: logger.info(f"Dbt tasks being pushed to the pipeline") @@ -409,13 +428,6 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda orguser.org, orgdbt ) - # dbt cli profile block - cli_block = OrgPrefectBlockv1.objects.filter( - org=orguser.org, block_type=DBTCLIPROFILE - ).first() - if not cli_block: - raise HttpError(400, "dbt cli profile not found") - payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq for transform_task in payload.transformTasks: @@ -424,24 +436,44 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda logger.error(f"org task with {transform_task.uuid} not found") continue - # map this org task to dataflow - dbt_git_orgtasks.append(org_task) + if org_task.task.type in ["dbt", "git"]: + dbt_git_orgtasks.append(org_task) + elif org_task.task.type == "dbtcloud": + dbt_cloud_orgtasks.append(org_task) + + # dbt cli profile block + cli_block = None + if len(dbt_git_orgtasks) > 0: + cli_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLIPROFILE + ).first() + if not cli_block: + raise HttpError(400, "dbt cli profile not found") + + # dbt cloud creds block + dbt_cloud_creds_block = None + if len(dbt_cloud_orgtasks) > 0: + dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter( + org=orguser.org, block_type=DBTCLOUDCREDS + ).first() + if not dbt_cloud_creds_block: + raise HttpError(400, "dbt cloud creds block not found") # get the deployment task configs task_configs, error = pipeline_with_orgtasks( orguser.org, - dbt_git_orgtasks, + dbt_git_orgtasks + dbt_cloud_orgtasks, cli_block=cli_block, dbt_project_params=dbt_project_params, start_seq=len(tasks), + dbt_cloud_creds_block=dbt_cloud_creds_block, ) - logger.info("HERE") - logger.info(task_configs) if error: raise HttpError(400, error) tasks += task_configs map_org_tasks += dbt_git_orgtasks + map_org_tasks += dbt_cloud_orgtasks # update deployment payload.deployment_params = {"config": {"tasks": tasks, "org_slug": orguser.org.slug}} diff --git a/ddpui/core/orgtaskfunctions.py b/ddpui/core/orgtaskfunctions.py index 3da9288e..604e75b9 100644 --- a/ddpui/core/orgtaskfunctions.py +++ b/ddpui/core/orgtaskfunctions.py @@ -20,9 +20,9 @@ PrefectDataFlowCreateSchema3, ) from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams from ddpui.ddpprefect import prefect_service -from ddpui.core.pipelinefunctions import setup_dbt_core_task_config +from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR from ddpui.utils.helpers import generate_hash_id @@ -109,14 +109,32 @@ def get_edr_send_report_task(org: Org, **kwargs) -> OrgTask | None: def create_prefect_deployment_for_dbtcore_task( org_task: OrgTask, - cli_profile_block: OrgPrefectBlockv1, - dbt_project_params: DbtProjectParams, + credentials_profile_block: OrgPrefectBlockv1, + dbt_project_params: Union[DbtProjectParams, DbtCloudJobParams], ): """ - create a prefect deployment for a single dbt command and save the deployment id to an OrgDataFlowv1 object + - create a prefect deployment for a single dbt command or dbt cloud job + - save the deployment id to an OrgDataFlowv1 object + - for dbt core operation; the credentials_profile_block is cli profile block + - for dbt cloud job; the credentials_profile_block is dbt cloud credentials block """ hash_code = generate_hash_id(8) deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}" + + tasks = [] + if org_task.task.type == "dbt": + tasks = [ + setup_dbt_core_task_config( + org_task, credentials_profile_block, dbt_project_params + ).to_json() + ] + elif org_task.task.type == "dbtcloud": + tasks = [ + setup_dbt_cloud_task_config( + org_task, credentials_profile_block, dbt_project_params + ).to_json() + ] + dataflow = prefect_service.create_dataflow_v1( PrefectDataFlowCreateSchema3( deployment_name=deployment_name, @@ -124,11 +142,7 @@ def create_prefect_deployment_for_dbtcore_task( orgslug=org_task.org.slug, deployment_params={ "config": { - "tasks": [ - setup_dbt_core_task_config( - org_task, cli_profile_block, dbt_project_params - ).to_json() - ], + "tasks": tasks, "org_slug": org_task.org.slug, } }, diff --git a/ddpui/core/pipelinefunctions.py b/ddpui/core/pipelinefunctions.py index 0bd95495..46662b8d 100644 --- a/ddpui/core/pipelinefunctions.py +++ b/ddpui/core/pipelinefunctions.py @@ -3,7 +3,6 @@ do not raise http errors here """ -from pathlib import Path from typing import Union from functools import cmp_to_key from django.db import transaction @@ -15,6 +14,7 @@ from ddpui.utils.custom_logger import CustomLogger from ddpui.ddpprefect.schema import ( PrefectDbtTaskSetup, + PrefectDbtCloudTaskSetup, PrefectShellTaskSetup, PrefectAirbyteSyncTaskSetup, PrefectAirbyteRefreshSchemaTaskSetup, @@ -22,6 +22,7 @@ ) from ddpui.ddpprefect import ( AIRBYTECONNECTION, + DBTCLOUDJOB, DBTCORE, SECRET, SHELLOPERATION, @@ -38,8 +39,9 @@ TASK_AIRBYTERESET, UPDATE_SCHEMA, TRANSFORM_TASKS_SEQ, + TASK_DBTCLOUD_JOB, ) -from ddpui.ddpdbt.schema import DbtProjectParams +from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams logger = CustomLogger("ddpui") @@ -102,6 +104,23 @@ def setup_dbt_core_task_config( ) +def setup_dbt_cloud_task_config( + org_task: OrgTask, + cloud_creds_block: OrgPrefectBlockv1, + dbt_project_params: DbtCloudJobParams, + seq: int = 1, +): + """constructs the prefect payload for a dbt-cloud job""" + return PrefectDbtCloudTaskSetup( + seq=seq, + slug=org_task.task.slug, + type=DBTCLOUDJOB, + dbt_cloud_job_id=dbt_project_params.job_id, + dbt_cloud_creds_block=cloud_creds_block.block_name, + orgtask_uuid=str(org_task.uuid), + ) + + def setup_git_pull_shell_task_config( org_task: OrgTask, project_dir: str, @@ -151,13 +170,14 @@ def pipeline_with_orgtasks( cli_block: OrgPrefectBlockv1 = None, dbt_project_params: DbtProjectParams = None, start_seq: int = 0, + dbt_cloud_creds_block: OrgPrefectBlockv1 = None, ): """ Returns a list of task configs for a pipeline; This assumes the list of orgtasks is in the correct sequence """ task_configs = [] - + # This block works perfectly for dbt cli tasks and dbt cloud tasks both. for org_task in org_tasks: task_config = None if org_task.task.slug == TASK_AIRBYTERESET: @@ -182,6 +202,10 @@ def pipeline_with_orgtasks( dbt_project_params.project_dir, dbt_project_params.venv_binary, ).to_json() + elif org_task.task.slug == TASK_DBTCLOUD_JOB: + task_config = setup_dbt_cloud_task_config( + org_task, dbt_cloud_creds_block, DbtCloudJobParams(**org_task.options()) + ).to_json() else: task_config = setup_dbt_core_task_config( org_task, cli_block, dbt_project_params diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index b14d5b32..f0ed60bc 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -30,6 +30,7 @@ TASK_DBTRUN, TASK_DBTSEED, TASK_DBTDEPS, + TASK_DBTCLOUD_JOB, ) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.utils.timezone import as_ist @@ -107,6 +108,7 @@ def task_config_params(task: Task): TASK_DBTTEST: {"flags": [], "options": ["select", "exclude"]}, TASK_DBTSEED: {"flags": [], "options": ["select"]}, TASK_DOCSGENERATE: {"flags": [], "options": []}, + TASK_DBTCLOUD_JOB: {"flags": [], "options": ["job_id"]}, } return TASK_CONIF_PARAM[task.slug] if task.slug in TASK_CONIF_PARAM else None diff --git a/ddpui/ddpdbt/schema.py b/ddpui/ddpdbt/schema.py index aea1086d..67d2bd44 100644 --- a/ddpui/ddpdbt/schema.py +++ b/ddpui/ddpdbt/schema.py @@ -1,4 +1,4 @@ -from typing import Optional, Union +from typing import Union from ninja import Schema from pathlib import Path @@ -14,3 +14,12 @@ class DbtProjectParams(Schema): target: str venv_binary: Union[str, Path] dbt_binary: Union[str, Path] + + +class DbtCloudJobParams(Schema): + """ + Schema to define all parameters required to run a any dbt command using dbt Cloud. + Extend this if you need to add more params while triggering a dbt cloud job + """ + + job_id: int diff --git a/ddpui/ddpprefect/__init__.py b/ddpui/ddpprefect/__init__.py index 86e0834e..eb8d3c0a 100644 --- a/ddpui/ddpprefect/__init__.py +++ b/ddpui/ddpprefect/__init__.py @@ -1,8 +1,10 @@ -# prefect block names +# prefect block names (they come from prefect's block_type table) AIRBYTESERVER = "Airbyte Server" AIRBYTECONNECTION = "Airbyte Connection" SHELLOPERATION = "Shell Operation" DBTCORE = "dbt Core Operation" +DBTCLOUDJOB = "dbt Cloud Job" +DBTCLOUDCREDS = "dbt Cloud Credentials" DBTCLIPROFILE = "dbt CLI Profile" SECRET = "Secret" diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index a55fc6ed..e6f48bc3 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -18,13 +18,15 @@ ) from ddpui.utils.custom_logger import CustomLogger from ddpui.models.tasks import DataflowOrgTask, TaskLock -from ddpui.models.org_user import OrgUser +from ddpui.models.org_user import OrgUser, Org +from ddpui.models.org import OrgPrefectBlockv1 from ddpui.models.flow_runs import PrefectFlowRun from ddpui.ddpprefect import ( DDP_WORK_QUEUE, FLOW_RUN_COMPLETED_STATE_TYPE, FLOW_RUN_CRASHED_STATE_TYPE, FLOW_RUN_FAILED_STATE_TYPE, + DBTCLOUDCREDS, ) from ddpui.utils.constants import ( FLOW_RUN_LOGS_OFFSET_LIMIT, @@ -79,6 +81,7 @@ def prefect_post(endpoint: str, json: dict, **kwargs) -> dict: **kwargs, ) except Exception as error: + logger.error(error) raise HttpError(500, "connection error") from error try: res.raise_for_status() @@ -113,6 +116,31 @@ def prefect_put(endpoint: str, json: dict, **kwargs) -> dict: return res.json() +def prefect_patch(endpoint: str, json: dict, **kwargs) -> dict: + """make a PATCH request to the proxy""" + # we send headers and timeout separately from kwargs, just to be explicit about it + headers = kwargs.pop("headers", {}) + headers["x-ddp-org"] = logger.get_slug() + timeout = kwargs.pop("timeout", http_timeout) + + try: + res = requests.patch( + f"{PREFECT_PROXY_API_URL}/proxy/{endpoint}", + headers=headers, + timeout=timeout, + json=json, + **kwargs, + ) + except Exception as error: + raise HttpError(500, "connection error") from error + try: + res.raise_for_status() + except Exception as error: + logger.exception(error) + raise HttpError(res.status_code, res.text) from error + return res.json() + + def prefect_delete_a_block(block_id: str, **kwargs) -> None: """makes a DELETE request to the proxy""" # we send headers and timeout separately from kwargs, just to be explicit about it @@ -677,3 +705,40 @@ def get_prefect_version(): """Fetch secret block id and block name""" response = prefect_get("prefect/version") return response + + +def upsert_dbt_cloud_creds_block(block_name: str, account_id: int, api_key: str) -> dict: + """Create a dbt cloud creds block in prefect; using patch style create or udpate""" + response = prefect_patch( + "blocks/dbtcloudcreds/", + {"block_name": block_name, "account_id": account_id, "api_key": api_key}, + ) + return response + + +def create_or_update_dbt_cloud_creds_block( + org: Org, + account_id: int, + api_key: str, +) -> OrgPrefectBlockv1: + """Create a dbt cli profile block in that has the warehouse information""" + cloud_creds_block = OrgPrefectBlockv1.objects.filter(org=org, block_type=DBTCLOUDCREDS).first() + block_name = None + + if not cloud_creds_block: + block_name = f"{org.slug}-dbtcloud-creds" + cloud_creds_block = OrgPrefectBlockv1( + org=org, + block_type=DBTCLOUDCREDS, + block_name=block_name, + ) + else: + block_name = cloud_creds_block.block_name + + result = upsert_dbt_cloud_creds_block(block_name, account_id, api_key) + + cloud_creds_block.block_id = result["block_id"] + cloud_creds_block.block_name = result["block_name"] + cloud_creds_block.save() + + return cloud_creds_block diff --git a/ddpui/ddpprefect/schema.py b/ddpui/ddpprefect/schema.py index ecacd5a5..2c29da55 100644 --- a/ddpui/ddpprefect/schema.py +++ b/ddpui/ddpprefect/schema.py @@ -1,7 +1,6 @@ -from typing import List, Optional +from typing import Optional from ninja import Schema -from pydantic import Field class PrefectAirbyteSync(Schema): @@ -160,6 +159,28 @@ def to_json(self): } +class PrefectDbtCloudTaskSetup(Schema): + """request payload to trigger a dbt cloud run task in prefect""" + + type: str + slug: str + dbt_cloud_job_id: int + dbt_cloud_creds_block: str + orgtask_uuid: str + seq: int = 0 + + def to_json(self): + """JSON serialization""" + return { + "type": self.type, + "slug": self.slug, + "dbt_cloud_job_id": self.dbt_cloud_job_id, + "dbt_cloud_creds_block": self.dbt_cloud_creds_block, + "orgtask_uuid": self.orgtask_uuid, + "seq": self.seq, + } + + class DbtProfile(Schema): """Docstring""" diff --git a/ddpui/management/commands/dbt_cloud_integration.py b/ddpui/management/commands/dbt_cloud_integration.py new file mode 100644 index 00000000..60a76d36 --- /dev/null +++ b/ddpui/management/commands/dbt_cloud_integration.py @@ -0,0 +1,41 @@ +from django.core.management.base import BaseCommand + +from ddpui.models.org import Org, OrgPrefectBlockv1 +from ddpui.models.org import OrgWarehouse +from ddpui.utils import secretsmanager +from ddpui.ddpprefect import DBTCLIPROFILE +from ddpui.ddpprefect import prefect_service + + +class Command(BaseCommand): + """ + This script lets us run tasks/jobs related to dbt cloud integrations in dalgo + """ + + help = "Dbt cloud related tasks" + + def add_arguments(self, parser): + parser.add_argument("org", type=str, help="Org slug") + parser.add_argument( + "--api-key", type=str, help="Api key for your dbt cloud account", required=True + ) + parser.add_argument( + "--account-id", type=int, help="Account id for your dbt cloud account", required=True + ) + + def handle(self, *args, **options): + """ + Create/update dbt cloud creds block + This should be replaced by configuration option on settings panel where users can add this + """ + org = Org.objects.filter(slug=options["org"]).first() + if org is None: + print(f"Org with slug {options['org']} does not exist") + return + + if options["api_key"] and options["account_id"]: + block: OrgPrefectBlockv1 = prefect_service.create_or_update_dbt_cloud_creds_block( + org, options["account_id"], options["api_key"] + ) + print("DBT Cloud credentials block created/updated %s", block.block_name) + return diff --git a/ddpui/tests/api_tests/test_orgtask_api.py b/ddpui/tests/api_tests/test_orgtask_api.py index 43138d69..9ce3b9f8 100644 --- a/ddpui/tests/api_tests/test_orgtask_api.py +++ b/ddpui/tests/api_tests/test_orgtask_api.py @@ -215,7 +215,7 @@ def test_seed_data(seed_db): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ diff --git a/ddpui/tests/api_tests/test_pipeline_api.py b/ddpui/tests/api_tests/test_pipeline_api.py index 8966b08c..dac7f631 100644 --- a/ddpui/tests/api_tests/test_pipeline_api.py +++ b/ddpui/tests/api_tests/test_pipeline_api.py @@ -245,7 +245,7 @@ def test_seed_data(seed_db): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ @@ -273,7 +273,10 @@ def test_post_prefect_dataflow_v1_failure2(orguser_transform_tasks): """tests the failure due to missing name of the dataflow in the payload""" connections = [PrefectFlowAirbyteConnection2(id="test-conn-id", seq=1)] payload = PrefectDataFlowCreateSchema4( - name="", connections=connections, cron="", transformTasks=[] + name="", + connections=connections, + cron="", + transformTasks=[], ) request = mock_request(orguser_transform_tasks) diff --git a/ddpui/tests/core/test_orgtaskfunctions.py b/ddpui/tests/core/test_orgtaskfunctions.py index 012e1dc1..47e60a14 100644 --- a/ddpui/tests/core/test_orgtaskfunctions.py +++ b/ddpui/tests/core/test_orgtaskfunctions.py @@ -34,7 +34,7 @@ def seed_master_tasks_db(django_db_setup, django_db_blocker): def test_seed_master_tasks(seed_master_tasks_db): """a test to seed the database""" - assert Task.objects.count() == 11 + assert Task.objects.count() == 12 # ================================================================================ diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 9c2fbfef..56c9e39e 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -4,6 +4,7 @@ TASK_DBTCLEAN = "dbt-clean" TASK_DBTDEPS = "dbt-deps" TASK_GITPULL = "git-pull" +TASK_DBTCLOUD_JOB = "dbt-cloud-job" # this is task slug so it should match the seed data. TASK_DOCSGENERATE = "dbt-docs-generate" TASK_AIRBYTESYNC = "airbyte-sync" TASK_AIRBYTERESET = "airbyte-reset" @@ -22,6 +23,7 @@ TASK_DBTTEST: 6, TASK_DOCSGENERATE: 7, TASK_GENERATE_EDR: 8, + TASK_DBTCLOUD_JOB: 20, } # when a new pipeline is created; these are the transform tasks being pushed by default DEFAULT_TRANSFORM_TASKS_IN_PIPELINE = [ @@ -30,12 +32,13 @@ TASK_DBTDEPS, TASK_DBTRUN, TASK_DBTTEST, + TASK_DBTCLOUD_JOB, ] # These are tasks to be run via deployment # Adding a new task here will work for any new orgtask created # But for the current ones a script would need to be run to set them with a deployment -LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST] +LONG_RUNNING_TASKS = [TASK_DBTRUN, TASK_DBTSEED, TASK_DBTTEST, TASK_DBTCLOUD_JOB] # airbyte sync timeout in deployment params AIRBYTE_SYNC_TIMEOUT = 15 diff --git a/seed/tasks.json b/seed/tasks.json index 134c45c0..287aeb50 100644 --- a/seed/tasks.json +++ b/seed/tasks.json @@ -119,5 +119,16 @@ "command": null, "is_system": false } + }, + { + "model": "ddpui.Task", + "pk": 12, + "fields": { + "type": "dbtcloud", + "slug": "dbt-cloud-job", + "label": "DBT Cloud job", + "command": null, + "is_system": true + } } ] \ No newline at end of file