Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions releases/notes/1.7.6.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

#### Prepare Deployment via ArgoCD
- Introduces `Dagster Template` step, to enable offloading the deployment of dagster user-code helm charts to other tools, such as ArgoCD, this step still takes care of writing the helm chart's values.yaml file and modifying the ConfigMap
- Introduces the `Dagster Helm Template` step that, additionally to the functionality described above, enables offloading the modification of dagster server's ConfigMap to other tools, such as GithubActions or ArgoCD, this step only takes care of writing the helm chart's values.yaml file
218 changes: 218 additions & 0 deletions src/mpyl/steps/deploy/dagster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,224 @@
from ...utilities.helm import convert_to_helm_release_name, get_name_suffix


class HelmTemplateDagster(Step):
"""
This step only creates a dagster user code helm chart manifest but doesn't use helm to deloy the manifest,
Comment thread
kgrunert marked this conversation as resolved.
Outdated
and doesn't write an entry to the dagster server's configmap
"""

def __init__(self, logger: Logger) -> None:
super().__init__(
logger,
Meta(
name="Dagster Helm Template",
description="Creates a dagster user code helm chart",
version="0.0.1",
stage=STAGE_NAME,
),
produced_artifact=ArtifactType.NONE,
required_artifact=ArtifactType.DOCKER_IMAGE,
)

# pylint: disable=R0914
def execute(self, step_input: Input) -> Output:
"""
Creates the dagster user-code helm chart manifest and adds an server entry to dagster server's ConfigMap
Comment thread
kgrunert marked this conversation as resolved.
Outdated
"""
properties = step_input.run_properties
context = get_cluster_config_for_project(
step_input.run_properties, step_input.project_execution.project
).context
dagster_config: DagsterConfig = DagsterConfig.from_dict(properties.config)

config.load_kube_config(context=context)

name_suffix = get_name_suffix(properties)
release_name = convert_to_helm_release_name(
step_input.project_execution.name, name_suffix
)

builder = ChartBuilder(step_input)

user_code_deployment = to_user_code_values(
builder=builder,
release_name=release_name,
name_suffix=name_suffix,
run_properties=properties,
service_account_override=dagster_config.global_service_account_override,
docker_config=DockerConfig.from_dict(properties.config),
)

self._logger.debug(
f"Writing user code manifest with values: {user_code_deployment}"
)

values_path = Path(step_input.project_execution.project.target_path)
self._logger.info(f"Writing Helm values to {values_path}")

write_chart(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there is quite some overlap with the k8s deploy step, but just too much difference to align them 😅

chart={},
chart_path=values_path,
chart_metadata="",
values=user_code_deployment,
)

return Output(
True, f"Successfully written helm chart manifest to {values_path}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might also want to add a artifact=input_to_artifact, Output is pretty important for logging and debugging

)


class TemplateDagster(Step):
"""
This step creates a dagster user code helm chart manifest and writes an entry to the dagster server's configmap
but doesn't use helm to deloy the manifest.
Comment thread
kgrunert marked this conversation as resolved.
Outdated
"""

def __init__(self, logger: Logger) -> None:
super().__init__(
logger,
Meta(
name="Dagster Template",
description="Creates a dagster user code helm chart and adds an entry to dagster's K8s ConfigMap",
version="0.0.1",
stage=STAGE_NAME,
),
produced_artifact=ArtifactType.NONE,
required_artifact=ArtifactType.DOCKER_IMAGE,
)

def __evaluate_results(self, results: List[Output]):
Comment thread
kgrunert marked this conversation as resolved.
Outdated
return (
reduce(
self.__flatten_result_messages,
results[1:],
results[0],
)
if len(results) > 1
else results[0]
)

@staticmethod
def __flatten_result_messages(acc: Output, curr: Output) -> Output:
Comment thread
kgrunert marked this conversation as resolved.
Outdated
return Output(
success=acc.success and curr.success,
message=f"{acc.message}\n{curr.message}",
)

# pylint: disable=R0914
def execute(self, step_input: Input) -> Output:
"""
Creates the dagster user-code helm chart manifest and adds an server entry to dagster server's ConfigMap
"""
properties = step_input.run_properties
context = get_cluster_config_for_project(
step_input.run_properties, step_input.project_execution.project
).context
dagster_config: DagsterConfig = DagsterConfig.from_dict(properties.config)
dagster_template_results = []

config.load_kube_config(context=context)
core_api = client.CoreV1Api()
apps_api = client.AppsV1Api()

name_suffix = get_name_suffix(properties)
release_name = convert_to_helm_release_name(
step_input.project_execution.name, name_suffix
)

builder = ChartBuilder(step_input)

user_code_deployment = to_user_code_values(
builder=builder,
release_name=release_name,
name_suffix=name_suffix,
run_properties=properties,
service_account_override=dagster_config.global_service_account_override,
docker_config=DockerConfig.from_dict(properties.config),
)

self._logger.debug(
f"Writing user code manifest with values: {user_code_deployment}"
)

values_path = Path(step_input.project_execution.project.target_path)
self._logger.info(f"Writing Helm values to {values_path}")

write_chart(
chart={},
chart_path=values_path,
chart_metadata="",
values=user_code_deployment,
)

if not step_input.dry_run:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything above from this is a copy of the Dagster Helm Template step right? Maybe optimise that a bit to reduce the amount of copied code? Might also make it more clear which parts are shared

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried extracting big duplications to DagsterBase, wasnt too sure about the rollout_restart functions, but in theory, can do that too..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c788197

heck, why not

config_map = get_config_map(
core_api,
dagster_config.base_namespace,
dagster_config.workspace_config_map,
)
dagster_workspace = yaml.safe_load(
config_map.data[dagster_config.workspace_file_key]
)

server_names = [
w["grpc_server"]["location_name"]
for w in dagster_workspace["load_from"]
]

# If the server new (not in existing workspace.yml), we append it
user_code_name_to_deploy = user_code_deployment["deployments"][0]["name"]
if user_code_name_to_deploy not in server_names:
self._logger.info(
f"Adding new server {user_code_name_to_deploy} to dagster's workspace.yaml"
)
dagster_workspace["load_from"].append(
to_grpc_server_entry(
host=user_code_name_to_deploy,
port=user_code_deployment["deployments"][0]["port"],
location_name=user_code_name_to_deploy,
)
)
updated_config_map = update_config_map_field(
config_map=config_map,
field=dagster_config.workspace_file_key,
data=dagster_workspace,
)
config_map_update_result = replace_config_map(
core_api,
dagster_config.base_namespace,
dagster_config.workspace_config_map,
updated_config_map,
)

dagster_template_results.append(config_map_update_result)
if config_map_update_result.success:
self._logger.info(
f"Successfully added {user_code_name_to_deploy} to dagster's workspace.yaml"
)

# restarting ui and daemon
rollout_restart_output = rollout_restart_deployment(
self._logger,
apps_api,
dagster_config.base_namespace,
dagster_config.daemon,
)

dagster_template_results.append(rollout_restart_output)
if rollout_restart_output.success:
self._logger.info(rollout_restart_output.message)
rollout_restart_output = rollout_restart_deployment(
self._logger,
apps_api,
dagster_config.base_namespace,
dagster_config.webserver,
)
dagster_template_results.append(rollout_restart_output)
self._logger.info(rollout_restart_output.message)
return self.__evaluate_results(dagster_template_results)


class DeployDagster(Step):
def __init__(self, logger: Logger) -> None:
super().__init__(
Expand Down