Skip to content

Commit

Permalink
feat(python-client): Support for init containers in driver pod (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jan 15, 2025
1 parent f064825 commit 4f00e1b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
5 changes: 5 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class SparkOnK8SOperator(BaseOperator):
executor_node_selector: Node selector for the executor pods.
driver_tolerations: Tolerations for the driver pod.
driver_ephemeral_configmaps_volumes: List of ConfigMaps to mount as ephemeral volumes to the driver.
driver_init_containers: List of init containers for the driver pod.
spark_on_k8s_service_url: URL of the Spark On K8S service. Defaults to None.
kubernetes_conn_id (str, optional): Kubernetes connection ID. Defaults to
"kubernetes_default".
Expand Down Expand Up @@ -168,6 +169,7 @@ def __init__(
driver_tolerations: list[k8s.V1Toleration] | None = None,
executor_pod_template_path: str | None = None,
driver_ephemeral_configmaps_volumes: list[ConfigMap] | None = None,
driver_init_containers: list[k8s.V1Container] | None = None,
spark_on_k8s_service_url: str | None = None,
kubernetes_conn_id: str = "kubernetes_default",
poll_interval: int = 10,
Expand Down Expand Up @@ -208,6 +210,7 @@ def __init__(
self.driver_tolerations = driver_tolerations
self.executor_pod_template_path = executor_pod_template_path
self.driver_ephemeral_configmaps_volumes = driver_ephemeral_configmaps_volumes
self.driver_init_containers = driver_init_containers
self.spark_on_k8s_service_url = spark_on_k8s_service_url
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
Expand Down Expand Up @@ -334,6 +337,8 @@ def _submit_new_job(self, context: Context):
submit_app_kwargs = {}
if self.app_id_suffix:
submit_app_kwargs["app_id_suffix"] = lambda: self.app_id_suffix
if self.driver_init_containers is not None:
submit_app_kwargs["driver_init_containers"] = self.driver_init_containers
self._driver_pod_name = spark_client.submit_app(
image=self.image,
app_path=self.app_path,
Expand Down
5 changes: 5 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def submit_app(
driver_tolerations: list[k8s.V1Toleration] | ArgNotSet = NOTSET,
executor_pod_template_path: str | ArgNotSet = NOTSET,
startup_timeout: int | ArgNotSet = NOTSET,
driver_init_containers: list[k8s.V1Container] | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -190,6 +191,7 @@ def submit_app(
driver_tolerations: List of tolerations for the driver
executor_pod_template_path: Path to the executor pod template file
startup_timeout: Timeout in seconds to wait for the application to start
driver_init_containers: List of init containers to run before the driver starts
Returns:
Name of the Spark application pod
Expand Down Expand Up @@ -297,6 +299,8 @@ def submit_app(
executor_pod_template_path = Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH
if startup_timeout is NOTSET:
startup_timeout = Configuration.SPARK_ON_K8S_STARTUP_TIMEOUT
if driver_init_containers is NOTSET:
driver_init_containers = []

spark_conf = spark_conf or {}
main_class_parameters = app_arguments or []
Expand Down Expand Up @@ -417,6 +421,7 @@ def submit_app(
volume_mounts=driver_volume_mounts,
node_selector=driver_node_selector,
tolerations=driver_tolerations,
init_containers=driver_init_containers,
)
with self.k8s_client_manager.client() as client:
api = k8s.CoreV1Api(client)
Expand Down
3 changes: 3 additions & 0 deletions spark_on_k8s/utils/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def create_spark_pod_spec(
volume_mounts: list[k8s.V1VolumeMount] | None = None,
node_selector: dict[str, str] | None = None,
tolerations: list[k8s.V1Toleration] | None = None,
init_containers: list[k8s.V1Container] | None = None,
) -> k8s.V1PodTemplateSpec:
"""Create a pod spec for a Spark application
Expand All @@ -349,6 +350,7 @@ def create_spark_pod_spec(
volume_mounts: List of volume mounts to mount in the container
node_selector: Node selector to use for the pod
tolerations: List of tolerations to use for the pod
init_containers: List of init containers to run before the main container
Returns:
Pod template spec for the Spark application
Expand Down Expand Up @@ -381,6 +383,7 @@ def create_spark_pod_spec(
volumes=volumes,
node_selector=node_selector,
tolerations=tolerations,
init_containers=init_containers,
)
template = k8s.V1PodTemplateSpec(
metadata=pod_metadata,
Expand Down
13 changes: 13 additions & 0 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024),
executor_instances=ExecutorInstances(min=2, max=5, initial=5),
driver_init_containers=[
k8s.V1Container(
name="init-container",
image="init-container-image",
command=["init-command"],
args=["init-arg"],
)
],
)

expected_app_name = "pyspark-job-example"
Expand All @@ -203,6 +211,11 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
assert created_pod.metadata.labels["spark-role"] == "driver"
assert created_pod.spec.containers[0].image == "pyspark-job"
assert created_pod.spec.service_account_name == "spark"
assert len(created_pod.spec.init_containers) == 1
assert created_pod.spec.init_containers[0].name == "init-container"
assert created_pod.spec.init_containers[0].image == "init-container-image"
assert created_pod.spec.init_containers[0].command == ["init-command"]
assert created_pod.spec.init_containers[0].args == ["init-arg"]
assert created_pod.spec.containers[0].args == [
"driver",
"--master",
Expand Down

0 comments on commit 4f00e1b

Please sign in to comment.