diff --git a/releases/notes/1.7.6.md b/releases/notes/1.7.6.md new file mode 100644 index 000000000..11ce72e91 --- /dev/null +++ b/releases/notes/1.7.6.md @@ -0,0 +1,4 @@ + +#### Prepare Deployment via ArgoCD +- Introduces `Dagster Template` step, to enable offloading the deployment of dagster user-code helm charts to other tools, such as ArgoCD, this step still takes care of writing the helm chart's values.yaml file and modifying the ConfigMap +- Introduces the `Dagster Helm Template` step that, additionally to the functionality described above, enables offloading the modification of dagster server's ConfigMap to other tools, such as GithubActions or ArgoCD, this step only takes care of writing the helm chart's values.yaml file \ No newline at end of file diff --git a/src/mpyl/steps/deploy/dagster.py b/src/mpyl/steps/deploy/dagster.py index a69db2423..01e0e7b18 100644 --- a/src/mpyl/steps/deploy/dagster.py +++ b/src/mpyl/steps/deploy/dagster.py @@ -4,10 +4,11 @@ from functools import reduce from logging import Logger from pathlib import Path -from typing import List +from typing import List, Tuple, Optional import yaml from kubernetes import config, client +from kubernetes.client import CoreV1Api from . import STAGE_NAME from .k8s import ( @@ -23,18 +24,144 @@ from .k8s.helm import write_chart from .k8s.resources.dagster import to_user_code_values, to_grpc_server_entry, Constants from .. import Step, Meta, ArtifactType, Input, Output +from ..models import RunProperties from ...utilities.dagster import DagsterConfig from ...utilities.docker import DockerConfig from ...utilities.helm import convert_to_helm_release_name, get_name_suffix -class DeployDagster(Step): +class DagsterBase: + def combine_outputs(self, results: List[Output]) -> Output: + return ( + reduce( + self.__flatten_output_messages, + results[1:], + results[0], + ) + if len(results) > 1 + else results[0] + ) + + @staticmethod + def __flatten_output_messages(acc: Output, curr: Output) -> Output: + return Output( + success=acc.success and curr.success, + message=f"{acc.message}\n{curr.message}", + ) + + @staticmethod + def write_user_code_manifest( + step_input: Input, + properties: RunProperties, + global_service_account_override: Optional[str], + ) -> Tuple[dict, Path]: + builder = ChartBuilder(step_input) + + name_suffix = get_name_suffix(properties) + release_name = convert_to_helm_release_name( + step_input.project_execution.name, name_suffix + ) + + user_code_deployment = to_user_code_values( + builder=builder, + release_name=release_name, + name_suffix=name_suffix, + run_properties=properties, + service_account_override=global_service_account_override, + docker_config=DockerConfig.from_dict(properties.config), + ) + + values_path = Path(step_input.project_execution.project.target_path) + + write_chart( + chart={}, + chart_path=values_path, + chart_metadata="", + values=user_code_deployment, + ) + return user_code_deployment, values_path + + @staticmethod + def add_server_to_server_list( + core_api: CoreV1Api, user_code_deployment: dict, dagster_config: DagsterConfig + ) -> Output: + config_map = get_config_map( + core_api, + dagster_config.base_namespace, + dagster_config.workspace_config_map, + ) + dagster_workspace = yaml.safe_load( + config_map.data[dagster_config.workspace_file_key] + ) + + server_names = [ + w["grpc_server"]["location_name"] for w in dagster_workspace["load_from"] + ] + + # If the server new (not in existing workspace.yml), we append it + user_code_name_to_deploy = user_code_deployment["deployments"][0]["name"] + if user_code_name_to_deploy not in server_names: + dagster_workspace["load_from"].append( + to_grpc_server_entry( + host=user_code_name_to_deploy, + port=user_code_deployment["deployments"][0]["port"], + location_name=user_code_name_to_deploy, + ) + ) + updated_config_map = update_config_map_field( + config_map=config_map, + field=dagster_config.workspace_file_key, + data=dagster_workspace, + ) + return replace_config_map( + core_api, + dagster_config.base_namespace, + dagster_config.workspace_config_map, + updated_config_map, + ) + return Output( + success=True, + message="Server name already exists in list, no addition needed", + produced_artifact=None, + ) + + @staticmethod + def restart_dagster_instances( + logger, apps_api, dagster_config: DagsterConfig + ) -> List[Output]: + # restarting ui and daemon + rollout_restart_daemon_output = rollout_restart_deployment( + logger, + apps_api, + dagster_config.base_namespace, + dagster_config.daemon, + ) + + if rollout_restart_daemon_output.success: + logger.info(rollout_restart_daemon_output.message) + rollout_restart_server_output = rollout_restart_deployment( + logger, + apps_api, + dagster_config.base_namespace, + dagster_config.webserver, + ) + logger.info(rollout_restart_server_output.message) + return [rollout_restart_daemon_output, rollout_restart_server_output] + return [rollout_restart_daemon_output] + + +class HelmTemplateDagster(Step, DagsterBase): + """ + This step only creates a dagster user code helm chart manifest but doesn't use helm to deploy the manifest, + and doesn't write an entry to the dagster server's configmap + """ + def __init__(self, logger: Logger) -> None: super().__init__( logger, Meta( - name="Dagster Deploy", - description="Deploy a dagster user code repository to k8s", + name="Dagster Helm Template", + description="Creates a dagster user code helm chart", version="0.0.1", stage=STAGE_NAME, ), @@ -42,22 +169,110 @@ def __init__(self, logger: Logger) -> None: required_artifact=ArtifactType.DOCKER_IMAGE, ) - def __evaluate_results(self, results: List[Output]): - return ( - reduce( - self.__flatten_result_messages, - results[1:], - results[0], - ) - if len(results) > 1 - else results[0] + # pylint: disable=R0914 + def execute(self, step_input: Input) -> Output: + """ + Creates the dagster user-code helm chart manifest + """ + properties = step_input.run_properties + context = get_cluster_config_for_project( + step_input.run_properties, step_input.project_execution.project + ).context + dagster_config: DagsterConfig = DagsterConfig.from_dict(properties.config) + + config.load_kube_config(context=context) + + user_code_deployment, values_path = self.write_user_code_manifest( + step_input, properties, dagster_config.global_service_account_override ) + self._logger.debug( + f"Written user code manifest with values: {user_code_deployment}" + ) + self._logger.info(f"Writing Helm values to {values_path}") - @staticmethod - def __flatten_result_messages(acc: Output, curr: Output) -> Output: return Output( - success=acc.success and curr.success, - message=f"{acc.message}\n{curr.message}", + True, + f"Successfully written helm chart manifest to {values_path}", + produced_artifact=step_input.required_artifact, + ) + + +class TemplateDagster(Step, DagsterBase): + """ + This step creates a dagster user code helm chart manifest and writes an entry to the dagster server's configmap + but doesn't use helm to deploy the manifest. + """ + + def __init__(self, logger: Logger) -> None: + super().__init__( + logger, + Meta( + name="Dagster Template", + description="Creates a dagster user code helm chart and adds an entry to dagster's K8s ConfigMap", + version="0.0.1", + stage=STAGE_NAME, + ), + produced_artifact=ArtifactType.NONE, + required_artifact=ArtifactType.DOCKER_IMAGE, + ) + + # pylint: disable=R0914 + def execute(self, step_input: Input) -> Output: + """ + Creates the dagster user-code helm chart manifest and adds an server entry to dagster server's ConfigMap + """ + properties = step_input.run_properties + context = get_cluster_config_for_project( + step_input.run_properties, step_input.project_execution.project + ).context + dagster_config: DagsterConfig = DagsterConfig.from_dict(properties.config) + + user_code_deployment, values_path = self.write_user_code_manifest( + step_input, properties, dagster_config.global_service_account_override + ) + self._logger.debug( + f"Written user code manifest with values: {user_code_deployment}" + ) + self._logger.info(f"Writing Helm values to {values_path}") + + # early exit + if step_input.dry_run: + return Output( + success=True, + message="Successfully written Helm Chart in dry-run, not deploying.", + produced_artifact=step_input.required_artifact, + ) + + dagster_template_results = [] + + config.load_kube_config(context=context) + core_api = client.CoreV1Api() + apps_api = client.AppsV1Api() + + add_server_list_output = self.add_server_to_server_list( + core_api, user_code_deployment, dagster_config + ) + dagster_template_results.append(add_server_list_output) + if add_server_list_output.success: + restart_outputs = self.restart_dagster_instances( + self._logger, apps_api, dagster_config + ) + dagster_template_results.extend(restart_outputs) + return self.combine_outputs(dagster_template_results) + + +class DeployDagster(Step, DagsterBase): + def __init__(self, logger: Logger) -> None: + super().__init__( + logger, + Meta( + name="Dagster Deploy", + description="Deploy a dagster user code repository to k8s", + version="0.0.1", + stage=STAGE_NAME, + ), + produced_artifact=ArtifactType.NONE, + required_artifact=ArtifactType.DOCKER_IMAGE, ) # pylint: disable=R0914 @@ -84,51 +299,33 @@ def execute(self, step_input: Input) -> Output: ) self._logger.info(f"Dagster Version: {dagster_version}") - result = helm.add_repo( + add_repo_ouput = helm.add_repo( self._logger, dagster_config.base_namespace, Constants.HELM_CHART_REPO ) - dagster_deploy_results.append(result) - if not result.success: - return self.__evaluate_results(dagster_deploy_results) + dagster_deploy_results.append(add_repo_ouput) + if not add_repo_ouput.success: + return self.combine_outputs(dagster_deploy_results) - result = helm.update_repo(self._logger) - dagster_deploy_results.append(result) - if not result.success: - return self.__evaluate_results(dagster_deploy_results) + update_repo_ouput = helm.update_repo(self._logger) + dagster_deploy_results.append(update_repo_ouput) + if not update_repo_ouput.success: + return self.combine_outputs(dagster_deploy_results) - name_suffix = get_name_suffix(properties) - release_name = convert_to_helm_release_name( - step_input.project_execution.name, name_suffix + user_code_deployment, values_path = self.write_user_code_manifest( + step_input, properties, dagster_config.global_service_account_override ) - - builder = ChartBuilder(step_input) - - user_code_deployment = to_user_code_values( - builder=builder, - release_name=release_name, - name_suffix=name_suffix, - run_properties=properties, - service_account_override=dagster_config.global_service_account_override, - docker_config=DockerConfig.from_dict(properties.config), + self._logger.debug( + f"Written user code manifest with values: {user_code_deployment}" ) - - self._logger.debug(f"Deploying user code with values: {user_code_deployment}") - - values_path = Path(step_input.project_execution.project.target_path) self._logger.info(f"Writing Helm values to {values_path}") - write_chart( - chart={}, - chart_path=values_path, - chart_metadata="", - values=user_code_deployment, - ) - helm_install_result = helm.install_chart_with_values( logger=self._logger, dry_run=step_input.dry_run, values_path=values_path / Path("values.yaml"), - release_name=release_name, + release_name=convert_to_helm_release_name( + step_input.project_execution.name, get_name_suffix(properties) + ), chart_version=dagster_version, chart_name=Constants.CHART_NAME, namespace=dagster_config.base_namespace, @@ -137,68 +334,15 @@ def execute(self, step_input: Input) -> Output: dagster_deploy_results.append(helm_install_result) if helm_install_result.success and not step_input.dry_run: - config_map = get_config_map( - core_api, - dagster_config.base_namespace, - dagster_config.workspace_config_map, + add_to_server_list_output = self.add_server_to_server_list( + core_api, user_code_deployment, dagster_config ) - dagster_workspace = yaml.safe_load( - config_map.data[dagster_config.workspace_file_key] - ) - - server_names = [ - w["grpc_server"]["location_name"] - for w in dagster_workspace["load_from"] - ] + dagster_deploy_results.append(add_to_server_list_output) - # If the server new (not in existing workspace.yml), we append it - user_code_name_to_deploy = user_code_deployment["deployments"][0]["name"] - if user_code_name_to_deploy not in server_names: - self._logger.info( - f"Adding new server {user_code_name_to_deploy} to dagster's workspace.yaml" - ) - dagster_workspace["load_from"].append( - to_grpc_server_entry( - host=user_code_name_to_deploy, - port=user_code_deployment["deployments"][0]["port"], - location_name=user_code_name_to_deploy, - ) - ) - updated_config_map = update_config_map_field( - config_map=config_map, - field=dagster_config.workspace_file_key, - data=dagster_workspace, - ) - config_map_update_result = replace_config_map( - core_api, - dagster_config.base_namespace, - dagster_config.workspace_config_map, - updated_config_map, + if add_to_server_list_output.success: + restart_outputs = self.restart_dagster_instances( + self._logger, apps_api, dagster_config ) + dagster_deploy_results.extend(restart_outputs) - dagster_deploy_results.append(config_map_update_result) - if config_map_update_result.success: - self._logger.info( - f"Successfully added {user_code_name_to_deploy} to dagster's workspace.yaml" - ) - - # restarting ui and daemon - rollout_restart_output = rollout_restart_deployment( - self._logger, - apps_api, - dagster_config.base_namespace, - dagster_config.daemon, - ) - - dagster_deploy_results.append(rollout_restart_output) - if rollout_restart_output.success: - self._logger.info(rollout_restart_output.message) - rollout_restart_output = rollout_restart_deployment( - self._logger, - apps_api, - dagster_config.base_namespace, - dagster_config.webserver, - ) - dagster_deploy_results.append(rollout_restart_output) - self._logger.info(rollout_restart_output.message) - return self.__evaluate_results(dagster_deploy_results) + return self.combine_outputs(dagster_deploy_results) diff --git a/src/mpyl/steps/deploy/k8s/resources/dagster.py b/src/mpyl/steps/deploy/k8s/resources/dagster.py index 96a512540..73c34a142 100644 --- a/src/mpyl/steps/deploy/k8s/resources/dagster.py +++ b/src/mpyl/steps/deploy/k8s/resources/dagster.py @@ -59,6 +59,9 @@ def to_user_code_values( "serviceAccount": {"create": create_local_service_account}, # ucd, short for user-code-deployment "fullnameOverride": f"ucd-{shorten_name(project.name)}{name_suffix}", + "imagePullSecrets": [ + {"name": "aws-ecr"}, + ], "deployments": [ { "dagsterApiGrpcArgs": [ @@ -75,7 +78,6 @@ def to_user_code_values( "envSecrets": [{"name": s.name} for s in project.dagster.secrets], "image": { "pullPolicy": "Always", - "imagePullSecrets": [{"name": "bigdataregistry"}], "tag": run_properties.versioning.identifier, "repository": f"{docker_registry.host_name}/{project.name}", }, diff --git a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_extra_manifest.yaml b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_extra_manifest.yaml index e2e87790d..302f183db 100644 --- a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_extra_manifest.yaml +++ b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_extra_manifest.yaml @@ -1,6 +1,8 @@ serviceAccount: create: true fullnameOverride: ucd-educ-pr-1234 +imagePullSecrets: +- name: aws-ecr deployments: - dagsterApiGrpcArgs: - --python-file @@ -20,8 +22,6 @@ deployments: - name: some-normal-secret image: pullPolicy: Always - imagePullSecrets: - - name: bigdataregistry tag: pr-1234 repository: docker_host/example-dagster-user-code labels: diff --git a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_global_service_account.yaml b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_global_service_account.yaml index 8c7164aa9..6edc60f4c 100644 --- a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_global_service_account.yaml +++ b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_global_service_account.yaml @@ -3,6 +3,8 @@ global: serviceAccount: create: false fullnameOverride: ucd-educ-pr-1234 +imagePullSecrets: +- name: aws-ecr deployments: - dagsterApiGrpcArgs: - --python-file @@ -16,8 +18,6 @@ deployments: - name: some-normal-secret image: pullPolicy: Always - imagePullSecrets: - - name: bigdataregistry tag: pr-1234 repository: docker_host/example-dagster-user-code labels: diff --git a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_target_prod.yaml b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_target_prod.yaml index ba22f3f50..ec23fcffd 100644 --- a/tests/steps/deploy/dagster/dagster-user-deployments/values_with_target_prod.yaml +++ b/tests/steps/deploy/dagster/dagster-user-deployments/values_with_target_prod.yaml @@ -3,6 +3,8 @@ global: serviceAccount: create: false fullnameOverride: ucd-educ +imagePullSecrets: +- name: aws-ecr deployments: - dagsterApiGrpcArgs: - --python-file @@ -16,8 +18,6 @@ deployments: - name: some-normal-secret image: pullPolicy: Always - imagePullSecrets: - - name: bigdataregistry tag: 20230829-1234 repository: docker_host/example-dagster-user-code labels: diff --git a/tests/steps/deploy/dagster/dagster-user-deployments/values_without_global_service_account.yaml b/tests/steps/deploy/dagster/dagster-user-deployments/values_without_global_service_account.yaml index d28807cae..5f8e58ef0 100644 --- a/tests/steps/deploy/dagster/dagster-user-deployments/values_without_global_service_account.yaml +++ b/tests/steps/deploy/dagster/dagster-user-deployments/values_without_global_service_account.yaml @@ -1,6 +1,8 @@ serviceAccount: create: true fullnameOverride: ucd-educ-pr-1234 +imagePullSecrets: +- name: aws-ecr deployments: - dagsterApiGrpcArgs: - --python-file @@ -14,8 +16,6 @@ deployments: - name: some-normal-secret image: pullPolicy: Always - imagePullSecrets: - - name: bigdataregistry tag: pr-1234 repository: docker_host/example-dagster-user-code labels: