Skip to content

Commit

Permalink
feat(python-client): Support for host aliases in driver pod (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jan 15, 2025
1 parent 4f00e1b commit c49e4d5
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 0 deletions.
5 changes: 5 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class SparkOnK8SOperator(BaseOperator):
driver_tolerations: Tolerations for the driver pod.
driver_ephemeral_configmaps_volumes: List of ConfigMaps to mount as ephemeral volumes to the driver.
driver_init_containers: List of init containers for the driver pod.
driver_host_aliases: List of host aliases for the driver pod.
spark_on_k8s_service_url: URL of the Spark On K8S service. Defaults to None.
kubernetes_conn_id (str, optional): Kubernetes connection ID. Defaults to
"kubernetes_default".
Expand Down Expand Up @@ -170,6 +171,7 @@ def __init__(
executor_pod_template_path: str | None = None,
driver_ephemeral_configmaps_volumes: list[ConfigMap] | None = None,
driver_init_containers: list[k8s.V1Container] | None = None,
driver_host_aliases: list[k8s.V1HostAlias] | None = None,
spark_on_k8s_service_url: str | None = None,
kubernetes_conn_id: str = "kubernetes_default",
poll_interval: int = 10,
Expand Down Expand Up @@ -211,6 +213,7 @@ def __init__(
self.executor_pod_template_path = executor_pod_template_path
self.driver_ephemeral_configmaps_volumes = driver_ephemeral_configmaps_volumes
self.driver_init_containers = driver_init_containers
self.driver_host_aliases = driver_host_aliases
self.spark_on_k8s_service_url = spark_on_k8s_service_url
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
Expand Down Expand Up @@ -339,6 +342,8 @@ def _submit_new_job(self, context: Context):
submit_app_kwargs["app_id_suffix"] = lambda: self.app_id_suffix
if self.driver_init_containers is not None:
submit_app_kwargs["driver_init_containers"] = self.driver_init_containers
if self.driver_host_aliases is not None:
submit_app_kwargs["driver_host_aliases"] = self.driver_host_aliases
self._driver_pod_name = spark_client.submit_app(
image=self.image,
app_path=self.app_path,
Expand Down
5 changes: 5 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def submit_app(
executor_pod_template_path: str | ArgNotSet = NOTSET,
startup_timeout: int | ArgNotSet = NOTSET,
driver_init_containers: list[k8s.V1Container] | ArgNotSet = NOTSET,
driver_host_aliases: list[k8s.V1HostAlias] | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -192,6 +193,7 @@ def submit_app(
executor_pod_template_path: Path to the executor pod template file
startup_timeout: Timeout in seconds to wait for the application to start
driver_init_containers: List of init containers to run before the driver starts
driver_host_aliases: List of host aliases for the driver
Returns:
Name of the Spark application pod
Expand Down Expand Up @@ -301,6 +303,8 @@ def submit_app(
startup_timeout = Configuration.SPARK_ON_K8S_STARTUP_TIMEOUT
if driver_init_containers is NOTSET:
driver_init_containers = []
if driver_host_aliases is NOTSET:
driver_host_aliases = []

spark_conf = spark_conf or {}
main_class_parameters = app_arguments or []
Expand Down Expand Up @@ -422,6 +426,7 @@ def submit_app(
node_selector=driver_node_selector,
tolerations=driver_tolerations,
init_containers=driver_init_containers,
host_aliases=driver_host_aliases,
)
with self.k8s_client_manager.client() as client:
api = k8s.CoreV1Api(client)
Expand Down
3 changes: 3 additions & 0 deletions spark_on_k8s/utils/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ def create_spark_pod_spec(
node_selector: dict[str, str] | None = None,
tolerations: list[k8s.V1Toleration] | None = None,
init_containers: list[k8s.V1Container] | None = None,
host_aliases: list[k8s.V1HostAlias] | None = None,
) -> k8s.V1PodTemplateSpec:
"""Create a pod spec for a Spark application
Expand All @@ -351,6 +352,7 @@ def create_spark_pod_spec(
node_selector: Node selector to use for the pod
tolerations: List of tolerations to use for the pod
init_containers: List of init containers to run before the main container
host_aliases: List of host aliases to add to the pod
Returns:
Pod template spec for the Spark application
Expand Down Expand Up @@ -384,6 +386,7 @@ def create_spark_pod_spec(
node_selector=node_selector,
tolerations=tolerations,
init_containers=init_containers,
host_aliases=host_aliases,
)
template = k8s.V1PodTemplateSpec(
metadata=pod_metadata,
Expand Down
9 changes: 9 additions & 0 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
args=["init-arg"],
)
],
driver_host_aliases=[
k8s.V1HostAlias(
hostnames=["foo.local", "bar.local"],
ip="127.0.0.1",
)
],
)

expected_app_name = "pyspark-job-example"
Expand All @@ -216,6 +222,9 @@ def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced
assert created_pod.spec.init_containers[0].image == "init-container-image"
assert created_pod.spec.init_containers[0].command == ["init-command"]
assert created_pod.spec.init_containers[0].args == ["init-arg"]
assert len(created_pod.spec.host_aliases) == 1
assert created_pod.spec.host_aliases[0].hostnames == ["foo.local", "bar.local"]
assert created_pod.spec.host_aliases[0].ip == "127.0.0.1"
assert created_pod.spec.containers[0].args == [
"driver",
"--master",
Expand Down

0 comments on commit c49e4d5

Please sign in to comment.