Skip to content

Commit

Permalink
Merge pull request #918 from DalgoT4D/add-dbtcloud-run-dbttask
Browse files Browse the repository at this point in the history
ability to call dbt cloud api as part of pipelines
  • Loading branch information
fatchat authored Dec 18, 2024
2 parents dba8002 + 2846601 commit 079b0ef
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 73 deletions.
2 changes: 1 addition & 1 deletion ddpui/api/data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_tasks(request):
"""Fetch master list of tasks related to transformation"""
tasks = [
model_to_dict(task, exclude=["id"])
for task in Task.objects.filter(type__in=["dbt", "git"]).all()
for task in Task.objects.filter(type__in=["dbt", "git", "dbtcloud"]).all()
]
return tasks

Expand Down
65 changes: 45 additions & 20 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
from ddpui.ddpprefect import prefect_service
from ddpui.ddpairbyte import airbytehelpers

from ddpui.ddpprefect import (
DBTCLIPROFILE,
SECRET,
)
from ddpui.ddpprefect import DBTCLIPROFILE, SECRET, DBTCLOUDCREDS
from ddpui.models.org import (
Org,
OrgWarehouse,
Expand All @@ -31,7 +28,7 @@
from ddpui.ddpprefect.schema import (
PrefectSecretBlockCreate,
)
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpdbt.schema import DbtProjectParams, DbtCloudJobParams
from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters

from ddpui.core.orgdbt_manager import DbtProjectManager
Expand Down Expand Up @@ -97,21 +94,47 @@ def post_orgtask(request, payload: CreateOrgTaskPayload):

dataflow = None
if task.slug in LONG_RUNNING_TASKS:
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)
# For dbt-cli
if task.type == "dbt":
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()

if cli_profile_block is None:
raise HttpError(400, "dbt cli profile block not found")
if cli_profile_block is None:
raise HttpError(400, "dbt cli profile block not found")

dataflow = create_prefect_deployment_for_dbtcore_task(
orgtask, cli_profile_block, dbt_project_params
)
dataflow = create_prefect_deployment_for_dbtcore_task(
orgtask, cli_profile_block, dbt_project_params
)

# For dbt-cloud
if task.type == "dbtcloud":
# fetch dbt cloud creds block
dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLOUDCREDS
).first()

if dbt_cloud_creds_block is None:
raise HttpError(400, "dbt cloud credentials block not found")

try:
dbt_cloud_params = DbtCloudJobParams(**parameters["options"])
except Exception as error:
logger.exception(error)
raise HttpError(400, "Job id should be numeric") from error

try:
dataflow = create_prefect_deployment_for_dbtcore_task(
orgtask, dbt_cloud_creds_block, dbt_cloud_params
)
except Exception as error:
logger.exception(error)
raise HttpError(400, "failed to create dbt cloud deployment") from error

return {
**model_to_dict(orgtask, fields=["parameters"]),
Expand Down Expand Up @@ -208,7 +231,7 @@ def get_prefect_transformation_tasks(request):
org_tasks = (
OrgTask.objects.filter(
org=orguser.org,
task__type__in=["git", "dbt"],
task__type__in=["git", "dbt", "dbtcloud"],
)
.order_by("-generated_by")
.select_related("task")
Expand All @@ -226,7 +249,9 @@ def get_prefect_transformation_tasks(request):
for org_task in org_tasks:
# git pull : "git" + " " + "pull"
# dbt run --full-refresh : "dbt" + " " + "run --full-refresh"
command = org_task.task.type + " " + org_task.get_task_parameters()
command = None
if org_task.task.type != "dbtcloud":
command = org_task.task.type + " " + org_task.get_task_parameters()

lock = None
all_locks = [lock for lock in all_org_task_locks if lock.orgtask_id == org_task.id]
Expand Down Expand Up @@ -432,7 +457,7 @@ def post_delete_orgtask(request, orgtask_uuid): # pylint: disable=unused-argume
if org_task is None:
raise HttpError(400, "task not found")

if org_task.task.type not in ["dbt", "git", "edr"]:
if org_task.task.type not in ["dbt", "git", "edr", "dbtcloud"]:
raise HttpError(400, "task not supported")

if orguser.org.dbt is None:
Expand Down
90 changes: 61 additions & 29 deletions ddpui/api/pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from ddpui.ddpprefect import prefect_service
from ddpui.ddpairbyte import airbyte_service

from ddpui.ddpprefect import (
DBTCLIPROFILE,
AIRBYTESERVER,
)
from ddpui.ddpprefect import DBTCLIPROFILE, AIRBYTESERVER, DBTCLOUDCREDS
from ddpui.models.org import OrgDataFlowv1, OrgPrefectBlockv1
from ddpui.models.org_user import OrgUser
from ddpui.models.tasks import DataflowOrgTask, OrgTask
Expand Down Expand Up @@ -54,7 +51,7 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
if payload.name in [None, ""]:
raise HttpError(400, "must provide a name for the flow")

tasks = []
tasks = [] # This is main task array- containing airbyte and dbt task both.
map_org_tasks = [] # seq of org tasks to be mapped in pipelin/ dataflow

# push conection orgtasks in pipelin
Expand Down Expand Up @@ -101,20 +98,16 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
# push dbt pipeline orgtasks
dbt_project_params: DbtProjectParams = None
dbt_git_orgtasks = []
dbt_cloud_orgtasks = []
orgdbt = orguser.org.dbt
if payload.transformTasks and len(payload.transformTasks) > 0:
if (
payload.transformTasks and len(payload.transformTasks) > 0 ## For dbt cli & dbt cloud
): # dont modify this block as its of rlocal
logger.info("Dbt tasks being pushed to the pipeline")

# dbt params
dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt)

# dbt cli profile block
cli_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
if not cli_block:
raise HttpError(400, "dbt cli profile not found")

payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq

for transform_task in payload.transformTasks:
Expand All @@ -123,22 +116,47 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
logger.error(f"org task with {transform_task.uuid} not found")
continue

# map this org task to dataflow
dbt_git_orgtasks.append(org_task)
if org_task.task.type in ["dbt", "git"]:
dbt_git_orgtasks.append(org_task)
elif org_task.task.type == "dbtcloud":
dbt_cloud_orgtasks.append(org_task)

logger.info(f"{len(dbt_git_orgtasks)} Git/Dbt cli tasks being pushed to the pipeline")
logger.info(f"{len(dbt_cloud_orgtasks)} Dbt cloud tasks being pushed to the pipeline")

# dbt cli profile block
cli_block = None
if len(dbt_git_orgtasks) > 0:
cli_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
if not cli_block:
raise HttpError(400, "dbt cli profile not found")

# dbt cloud creds block
dbt_cloud_creds_block = None
if len(dbt_cloud_orgtasks) > 0:
dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLOUDCREDS
).first()
if not dbt_cloud_creds_block:
raise HttpError(400, "dbt cloud creds block not found")

# get the deployment task configs
task_configs, error = pipeline_with_orgtasks(
orguser.org,
dbt_git_orgtasks,
dbt_git_orgtasks + dbt_cloud_orgtasks,
cli_block=cli_block,
dbt_project_params=dbt_project_params,
start_seq=len(tasks),
dbt_cloud_creds_block=dbt_cloud_creds_block,
)
if error:
raise HttpError(400, error)
tasks += task_configs

map_org_tasks += dbt_git_orgtasks
map_org_tasks += dbt_cloud_orgtasks

# create deployment
try:
Expand Down Expand Up @@ -290,7 +308,7 @@ def get_prefect_dataflow_v1(request, deployment_id):
transform_tasks = [
{"uuid": dataflow_orgtask.orgtask.uuid, "seq": dataflow_orgtask.seq}
for dataflow_orgtask in DataflowOrgTask.objects.filter(
dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt"]
dataflow=org_data_flow, orgtask__task__type__in=["git", "dbt", "dbtcloud"]
)
.all()
.order_by("seq")
Expand Down Expand Up @@ -400,6 +418,7 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda
# push dbt pipeline orgtasks
dbt_project_params = None
dbt_git_orgtasks = []
dbt_cloud_orgtasks = []
orgdbt = orguser.org.dbt
if payload.transformTasks and len(payload.transformTasks) > 0:
logger.info(f"Dbt tasks being pushed to the pipeline")
Expand All @@ -409,13 +428,6 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda
orguser.org, orgdbt
)

# dbt cli profile block
cli_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
if not cli_block:
raise HttpError(400, "dbt cli profile not found")

payload.transformTasks.sort(key=lambda task: task.seq) # sort the tasks by seq

for transform_task in payload.transformTasks:
Expand All @@ -424,24 +436,44 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda
logger.error(f"org task with {transform_task.uuid} not found")
continue

# map this org task to dataflow
dbt_git_orgtasks.append(org_task)
if org_task.task.type in ["dbt", "git"]:
dbt_git_orgtasks.append(org_task)
elif org_task.task.type == "dbtcloud":
dbt_cloud_orgtasks.append(org_task)

# dbt cli profile block
cli_block = None
if len(dbt_git_orgtasks) > 0:
cli_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
if not cli_block:
raise HttpError(400, "dbt cli profile not found")

# dbt cloud creds block
dbt_cloud_creds_block = None
if len(dbt_cloud_orgtasks) > 0:
dbt_cloud_creds_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLOUDCREDS
).first()
if not dbt_cloud_creds_block:
raise HttpError(400, "dbt cloud creds block not found")

# get the deployment task configs
task_configs, error = pipeline_with_orgtasks(
orguser.org,
dbt_git_orgtasks,
dbt_git_orgtasks + dbt_cloud_orgtasks,
cli_block=cli_block,
dbt_project_params=dbt_project_params,
start_seq=len(tasks),
dbt_cloud_creds_block=dbt_cloud_creds_block,
)
logger.info("HERE")
logger.info(task_configs)
if error:
raise HttpError(400, error)
tasks += task_configs

map_org_tasks += dbt_git_orgtasks
map_org_tasks += dbt_cloud_orgtasks

# update deployment
payload.deployment_params = {"config": {"tasks": tasks, "org_slug": orguser.org.slug}}
Expand Down
34 changes: 24 additions & 10 deletions ddpui/core/orgtaskfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
PrefectDataFlowCreateSchema3,
)
from ddpui.ddpprefect import MANUL_DBT_WORK_QUEUE
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpdbt.schema import DbtCloudJobParams, DbtProjectParams
from ddpui.ddpprefect import prefect_service
from ddpui.core.pipelinefunctions import setup_dbt_core_task_config
from ddpui.core.pipelinefunctions import setup_dbt_core_task_config, setup_dbt_cloud_task_config
from ddpui.utils.constants import TASK_DBTRUN, TASK_GENERATE_EDR
from ddpui.utils.helpers import generate_hash_id

Expand Down Expand Up @@ -109,26 +109,40 @@ def get_edr_send_report_task(org: Org, **kwargs) -> OrgTask | None:

def create_prefect_deployment_for_dbtcore_task(
org_task: OrgTask,
cli_profile_block: OrgPrefectBlockv1,
dbt_project_params: DbtProjectParams,
credentials_profile_block: OrgPrefectBlockv1,
dbt_project_params: Union[DbtProjectParams, DbtCloudJobParams],
):
"""
create a prefect deployment for a single dbt command and save the deployment id to an OrgDataFlowv1 object
- create a prefect deployment for a single dbt command or dbt cloud job
- save the deployment id to an OrgDataFlowv1 object
- for dbt core operation; the credentials_profile_block is cli profile block
- for dbt cloud job; the credentials_profile_block is dbt cloud credentials block
"""
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}"

tasks = []
if org_task.task.type == "dbt":
tasks = [
setup_dbt_core_task_config(
org_task, credentials_profile_block, dbt_project_params
).to_json()
]
elif org_task.task.type == "dbtcloud":
tasks = [
setup_dbt_cloud_task_config(
org_task, credentials_profile_block, dbt_project_params
).to_json()
]

dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org_task.org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"tasks": tasks,
"org_slug": org_task.org.slug,
}
},
Expand Down
Loading

0 comments on commit 079b0ef

Please sign in to comment.