Skip to content

Commit

Permalink
Fix bugs in K8s system class introduced during the adoption of Pydantic
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Sep 17, 2024
1 parent 11c5592 commit 30e31b6
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
1 change: 1 addition & 0 deletions conf/common/system/kubernetes_cluster.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ install_path = "./install"
output_path = "./results"
default_image = "ubuntu:22.04"
default_namespace = "default"
monitor_interval = 1

[global_env_vars]
NCCL_IB_GID_INDEX = "3"
Expand Down
45 changes: 32 additions & 13 deletions src/cloudai/systems/kubernetes/kubernetes_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ class KubernetesSystem(BaseModel, System):
Represents a Kubernetes system.
Attributes
name (str): The name of the Kubernetes system.
install_path (Path): Path to the installation directory.
output_path (Path): Path to the output directory.
kube_config_path (Path): Path to the Kubernetes config file.
default_namespace (str): The default Kubernetes namespace for jobs.
default_image (str): Default Docker image to be used for jobs.
scheduler (str): The scheduler type, default is "kubernetes".
global_env_vars (Dict[str, Any]): Global environment variables to be passed to jobs.
monitor_interval (int): Time interval to monitor jobs, in seconds.
_core_v1 (client.CoreV1Api): Kubernetes Core V1 API client instance.
_batch_v1 (client.BatchV1Api): Kubernetes Batch V1 API client instance.
_custom_objects_api (CustomObjectsApi): Kubernetes Custom Objects API client instance.
"""

model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
Expand All @@ -47,51 +56,61 @@ class KubernetesSystem(BaseModel, System):
default_image: str
scheduler: str = "kubernetes"
global_env_vars: Dict[str, Any] = {}
monitor_interval: int = 1
_core_v1: client.CoreV1Api
_batch_v1: client.BatchV1Api
_custom_objects_api: CustomObjectsApi

def __post_init__(self) -> None:
def __init__(self, **data):
"""Initialize the KubernetesSystem instance."""
# Load the Kubernetes configuration
if not self.kube_config_path.exists():
super().__init__(**data)

kube_config_path = self.kube_config_path
if not kube_config_path.is_file():
home_directory = Path.home()
kube_config_path = home_directory / ".kube" / "config"
else:
kube_config_path = kube_config_path.resolve()

if not kube_config_path.exists():
error_message = (
f"Kube config file '{self.kube_config_path}' not found. This file is required to configure the "
f"Kube config file '{kube_config_path}' not found. This file is required to configure the "
f"Kubernetes environment. Please verify that the file exists at the specified path."
)
logging.error(error_message)
raise FileNotFoundError(error_message)

# Instantiate Kubernetes APIs
logging.debug(f"Loading kube config from: {self.kube_config_path}")
config.load_kube_config(config_file=str(self.kube_config_path))
logging.debug(f"Loading kube config from: {kube_config_path}")
config.load_kube_config(config_file=str(kube_config_path))

self._core_v1 = client.CoreV1Api()
self._batch_v1 = client.BatchV1Api()
self._custom_objects_api = CustomObjectsApi()

logging.debug(f"{self.__class__.__name__} initialized")

@property
def core_v1(self) -> client.CoreV1Api:
if self._core_v1 is None:
self._core_v1 = client.CoreV1Api()
"""Returns the Kubernetes Core V1 API client."""
return self._core_v1

@property
def batch_v1(self) -> client.BatchV1Api:
if self._batch_v1 is None:
self._batch_v1 = client.BatchV1Api()
"""Returns the Kubernetes Batch V1 API client."""
return self._batch_v1

@property
def custom_objects_api(self) -> CustomObjectsApi:
if self._custom_objects_api is None:
self._custom_objects_api = CustomObjectsApi()
"""Returns the Kubernetes Custom Objects API client."""
return self._custom_objects_api

def __repr__(self) -> str:
"""
Provide a structured string representation of the system.
Returns
str: A string that contains the system name and scheduler type.
str: A string that contains the system name, scheduler type, kube config path, namespace, and image.
"""
return (
f"System Name: {self.name}\n"
Expand Down
5 changes: 4 additions & 1 deletion tests/test_toml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from pathlib import Path
from unittest.mock import patch

import pytest
import toml
Expand All @@ -39,12 +40,14 @@ def test_toml_files(toml_file: Path):


@pytest.mark.parametrize("system_file", ALL_SYSTEMS, ids=lambda x: str(x))
def test_systems(system_file: Path):
@patch("kubernetes.config.load_kube_config")
def test_systems(mock_load_kube_config, system_file: Path):
"""
Validate the syntax of a system configuration file.
Args:
system_file (Path): The path to the system configuration file to validate.
"""
mock_load_kube_config.return_value = None
system = Parser(system_file, Path("conf/test_template")).parse_system(system_file)
assert system is not None

0 comments on commit 30e31b6

Please sign in to comment.