Skip to content

Commit 778f555

Browse files
authored
[SDK] Get Kubernetes Events for Job (kubeflow#1975)
* [SDK] Get Kubernetes Events for Job * Move class at the top * Add object kind to event message * Add example for events
1 parent 006dda4 commit 778f555

6 files changed

Lines changed: 191 additions & 47 deletions

File tree

.flake8

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[flake8]
22
max-line-length = 100
3+
extend-ignore = W503

manifests/overlays/kubeflow/kubeflow-training-roles.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ rules:
5757
- get
5858
- list
5959
- watch
60+
- apiGroups:
61+
- ""
62+
resources:
63+
- events
64+
verbs:
65+
- get
66+
- list
67+
- watch
6068

6169
---
6270
apiVersion: rbac.authorization.k8s.io/v1

sdk/python/kubeflow/training/api/training_client.py

Lines changed: 141 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import logging
1717
import time
1818
import json
19-
from typing import Optional, Callable, List, Dict, Any, Set, Union
19+
from typing import Optional, Callable, Tuple, List, Dict, Any, Set, Union
2020
import queue
2121
from kubernetes import client, config, watch
2222

@@ -855,16 +855,16 @@ def wait_for_job_conditions(
855855
{expected_conditions}"
856856
)
857857

858-
def get_job_pod_names(
858+
def get_job_pods(
859859
self,
860860
name: str,
861861
namespace: Optional[str] = None,
862862
is_master: bool = False,
863863
replica_type: Optional[str] = None,
864864
replica_index: Optional[int] = None,
865865
timeout: int = constants.DEFAULT_TIMEOUT,
866-
) -> List[str]:
867-
"""Get pod names for the Training Job.
866+
) -> List[models.V1Pod]:
867+
"""Get pods for the Training Job.
868868
869869
Args:
870870
name: Name for the Job.
@@ -889,7 +889,7 @@ def get_job_pod_names(
889889
timeout: Kubernetes API server timeout in seconds to execute the request.
890890
891891
Returns:
892-
list[str]: List of the Job pod names.
892+
list[V1Pod]: List of the Job pods.
893893
894894
Raises:
895895
ValueError: Job replica type is invalid.
@@ -933,23 +933,75 @@ def get_job_pod_names(
933933
if replica_index is not None:
934934
label_selector += f",{constants.REPLICA_INDEX_LABEL}={replica_index}"
935935

936-
# List Training Job pods.
937-
pods = []
936+
# Return list of Training Job pods.
938937
try:
939938
thread = self.core_api.list_namespaced_pod(
940939
namespace,
941940
label_selector=label_selector,
942941
async_req=True,
943942
)
944-
response = thread.get(timeout)
943+
return thread.get(timeout).items
945944
except multiprocessing.TimeoutError:
946945
raise TimeoutError(f"Timeout to list pods for Job: {namespace}/{name}")
947946
except Exception:
948947
raise RuntimeError(f"Failed to list pods for Job: {namespace}/{name}")
949948

950-
for pod in response.items:
951-
pods.append(pod.metadata.name)
952-
return pods
949+
def get_job_pod_names(
950+
self,
951+
name: str,
952+
namespace: Optional[str] = None,
953+
is_master: bool = False,
954+
replica_type: Optional[str] = None,
955+
replica_index: Optional[int] = None,
956+
timeout: int = constants.DEFAULT_TIMEOUT,
957+
) -> List[str]:
958+
"""Get pod names for the Training Job.
959+
960+
Args:
961+
name: Name for the Job.
962+
namespace: Namespace for the Job. By default namespace is taken from
963+
`TrainingClient` object.
964+
is_master: Whether to get pods only with the label
965+
`training.kubeflow.org/job-role: master`.
966+
replica_type: Type of the Job replica.
967+
For TFJob one of `Chief`, `PS`, or `worker`.
968+
969+
For PyTorchJob one of `master` or `worker`.
970+
971+
For MXJob one of `scheduler`, `server`, or `worker`.
972+
973+
For XGBoostJob one of `master` or `worker`.
974+
975+
For MPIJob one of `launcher` or `worker`.
976+
977+
For PaddleJob one of `master` or `worker`.
978+
979+
replica_index: Index for the Job replica.
980+
timeout: Kubernetes API server timeout in seconds to execute the request.
981+
982+
Returns:
983+
list[str]: List of the Job pod names.
984+
985+
Raises:
986+
ValueError: Job replica type is invalid.
987+
TimeoutError: Timeout to get Job pods.
988+
RuntimeError: Failed to get Job pods.
989+
"""
990+
991+
namespace = namespace or self.namespace
992+
993+
pods = self.get_job_pods(
994+
name=name,
995+
namespace=namespace,
996+
is_master=is_master,
997+
replica_type=replica_type,
998+
replica_index=replica_index,
999+
timeout=timeout,
1000+
)
1001+
pod_names = []
1002+
for pod in pods:
1003+
pod_names.append(pod.metadata.name)
1004+
return pod_names
9531005

9541006
def get_job_logs(
9551007
self,
@@ -961,7 +1013,8 @@ def get_job_logs(
9611013
replica_index: Optional[int] = None,
9621014
follow: bool = False,
9631015
timeout: int = constants.DEFAULT_TIMEOUT,
964-
) -> Dict[str, str]:
1016+
verbose: bool = False,
1017+
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
9651018
"""Get the logs for every Training Job pod. By default it returns logs from
9661019
the `master` pod. Logs are returned in this format: { "pod-name": "Log data" }.
9671020
@@ -990,21 +1043,35 @@ def get_job_logs(
9901043
follow: Whether to follow the log stream of the pod and print logs to StdOut.
9911044
timeout: Optional, Kubernetes API server timeout in seconds
9921045
to execute the request.
1046+
verbose: Whether to get Kubernetes events for Job and corresponding pods.
9931047
9941048
Returns:
9951049
Dict[str, str]: A dictionary in which the keys are pod names and the
9961050
values are the corresponding logs.
1051+
Dict[str, str]: A dictionary in which the keys are object kind and name, and the
1052+
values are list of the corresponding Kubernetes events with their timestamps. This
1053+
value is returned only if `verbose = True`. For example:
1054+
```json
1055+
{
1056+
"PyTorchJob train-mnist": [
1057+
"2024-01-05 22:58:20 Created pod: train-mnist-worker-0"
1058+
],
1059+
"Pod train-mnist-worker-0": [
1060+
"2024-01-05 22:58:20 Created container init-pytorch"
1061+
]
1062+
}
1063+
```
9971064
9981065
Raises:
9991066
ValueError: Job replica type is invalid.
1000-
TimeoutError: Timeout to get Job pods.
1001-
RuntimeError: Failed to get Job pods.
1067+
TimeoutError: Timeout to get Job or Job's pods
1068+
RuntimeError: Failed to get Job or Job's pods.
10021069
"""
10031070

10041071
namespace = namespace or self.namespace
10051072
job_kind = job_kind or self.job_kind
10061073

1007-
pods = self.get_job_pod_names(
1074+
pods = self.get_job_pods(
10081075
name=name,
10091076
namespace=namespace,
10101077
is_master=is_master,
@@ -1014,17 +1081,22 @@ def get_job_logs(
10141081
)
10151082

10161083
logs_dict = {}
1084+
events_dict = {}
10171085
if pods and follow:
10181086
log_streams = []
10191087
for pod in pods:
1020-
log_streams.append(
1021-
watch.Watch().stream(
1022-
self.core_api.read_namespaced_pod_log,
1023-
name=pod,
1024-
namespace=namespace,
1025-
container=constants.JOB_PARAMETERS[job_kind]["container"],
1088+
if (
1089+
pod.status is not None
1090+
and pod.status.phase != constants.POD_PHASE_PENDING
1091+
):
1092+
log_streams.append(
1093+
watch.Watch().stream(
1094+
self.core_api.read_namespaced_pod_log,
1095+
name=pod.metadata.name,
1096+
namespace=namespace,
1097+
container=constants.JOB_PARAMETERS[job_kind]["container"],
1098+
)
10261099
)
1027-
)
10281100
finished = [False for _ in log_streams]
10291101

10301102
# Create thread and queue per stream, for non-blocking iteration
@@ -1034,7 +1106,7 @@ def get_job_logs(
10341106
while True:
10351107
for index, log_queue in enumerate(log_queue_pool):
10361108
if all(finished):
1037-
return logs_dict
1109+
break
10381110
if finished[index]:
10391111
continue
10401112
# grouping the every 50 log lines of the same pod
@@ -1046,27 +1118,58 @@ def get_job_logs(
10461118
break
10471119

10481120
# Print logs to the StdOut
1049-
print(f"[Pod {pods[index]}]: {logline}")
1121+
print(f"[Pod {pods[index].metadata.name}]: {logline}")
10501122
# Add logs to the results dict.
1051-
if pods[index] not in logs_dict:
1052-
logs_dict[pods[index]] = logline
1123+
if pods[index].metadata.name not in logs_dict:
1124+
logs_dict[pods[index].metadata.name] = logline
10531125
else:
1054-
logs_dict[pods[index]] += logline
1126+
logs_dict[pods[index].metadata.name] += logline
10551127
except queue.Empty:
10561128
break
1129+
if all(finished):
1130+
break
10571131
elif pods:
10581132
for pod in pods:
1059-
try:
1060-
pod_logs = self.core_api.read_namespaced_pod_log(
1061-
pod,
1062-
namespace,
1063-
container=constants.JOB_PARAMETERS[job_kind]["container"],
1064-
)
1065-
logs_dict[pod] = pod_logs
1066-
except Exception:
1067-
raise RuntimeError(f"Failed to read logs for pod {namespace}/{pod}")
1068-
1069-
return logs_dict
1133+
if (
1134+
pod.status is not None
1135+
and pod.status.phase != constants.POD_PHASE_PENDING
1136+
):
1137+
try:
1138+
pod_logs = self.core_api.read_namespaced_pod_log(
1139+
name=pod.metadata.name,
1140+
namespace=namespace,
1141+
container=constants.JOB_PARAMETERS[job_kind]["container"],
1142+
)
1143+
logs_dict[pod.metadata.name] = pod_logs
1144+
except Exception:
1145+
raise RuntimeError(
1146+
f"Failed to read logs for pod {namespace}/{pod.metadata.name}"
1147+
)
1148+
# If verbose is set, return Kubernetes events for Job and pods.
1149+
if verbose:
1150+
job = self.get_job(name=name, namespace=namespace)
1151+
events = self.core_api.list_namespaced_event(namespace=namespace)
1152+
1153+
# Get events for the Job and Job's pods.
1154+
for event in events.items:
1155+
utils.add_event_to_dict(
1156+
events_dict=events_dict,
1157+
event=event,
1158+
object_kind=job_kind,
1159+
object_name=name,
1160+
object_creation_timestamp=job.metadata.creation_timestamp,
1161+
)
1162+
if pods:
1163+
for pod in pods:
1164+
utils.add_event_to_dict(
1165+
events_dict=events_dict,
1166+
event=event,
1167+
object_kind=constants.POD_KIND,
1168+
object_name=pod.metadata.name,
1169+
object_creation_timestamp=pod.metadata.creation_timestamp,
1170+
)
1171+
1172+
return logs_dict, events_dict
10701173

10711174
def update_job(
10721175
self,

sdk/python/kubeflow/training/constants/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
VERSION = "v1"
3030
API_VERSION = f"{GROUP}/{VERSION}"
3131

32+
# Kind for pod.
33+
POD_KIND = "Pod"
34+
35+
# Pending status for pod phase.
36+
POD_PHASE_PENDING = "Pending"
37+
38+
3239
# Training Job conditions.
3340
JOB_CONDITION_CREATED = "Created"
3441
JOB_CONDITION_RUNNING = "Running"

sdk/python/kubeflow/training/utils/utils.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from datetime import datetime
1516
import os
1617
import logging
1718
import textwrap
@@ -53,6 +54,15 @@ def __init__(self, obj):
5354
self.data = json.dumps(obj)
5455

5556

57+
class SetEncoder(json.JSONEncoder):
58+
def default(self, obj):
59+
if isinstance(obj, set):
60+
return list(obj)
61+
if isinstance(obj, type):
62+
return obj.__name__
63+
return json.JSONEncoder.default(self, obj)
64+
65+
5666
def is_running_in_k8s():
5767
return os.path.isdir("/var/run/secrets/kubernetes.io/")
5868

@@ -368,10 +378,27 @@ def get_pvc_spec(
368378
return pvc_spec
369379

370380

371-
class SetEncoder(json.JSONEncoder):
372-
def default(self, obj):
373-
if isinstance(obj, set):
374-
return list(obj)
375-
if isinstance(obj, type):
376-
return obj.__name__
377-
return json.JSONEncoder.default(self, obj)
381+
def add_event_to_dict(
382+
events_dict: Dict[str, List[str]],
383+
event: models.CoreV1Event,
384+
object_kind: str,
385+
object_name: str,
386+
object_creation_timestamp: datetime,
387+
):
388+
"""Add Kubernetes event to the dict with this format:
389+
```
390+
{"<Object Kind> <Object Name>": "<Event Timestamp> <Event Message>"}
391+
```
392+
"""
393+
if (
394+
event.involved_object.kind == object_kind
395+
and event.involved_object.name == object_name
396+
and event.metadata.creation_timestamp >= object_creation_timestamp
397+
):
398+
event_time = event.metadata.creation_timestamp.strftime("%Y-%m-%d %H:%M:%S")
399+
event_msg = f"{event_time} {event.message}"
400+
if object_name not in events_dict:
401+
events_dict[f"{object_kind} {object_name}"] = [event_msg]
402+
else:
403+
events_dict[f"{object_kind} {object_name}"] += [event_msg]
404+
return events_dict

sdk/python/setup.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848
"Intended Audience :: Developers",
4949
"Intended Audience :: Education",
5050
"Intended Audience :: Science/Research",
51-
"Programming Language :: Python :: 3",
52-
"Programming Language :: Python :: 3 :: Only",
5351
"Programming Language :: Python :: 3.7",
5452
"Programming Language :: Python :: 3.8",
5553
"Programming Language :: Python :: 3.9",

0 commit comments

Comments
 (0)