From e7b7589ed8bebfe6a3546822fcffed3997153bf5 Mon Sep 17 00:00:00 2001 From: Peter Gasper <110403019+petergasper@users.noreply.github.com> Date: Fri, 6 Sep 2024 14:37:54 +0200 Subject: [PATCH] Add statefulset, daemonset, deployments collection. --- kubesurveyor/main.py | 234 ++++++++++++++++++++++++++----------------- 1 file changed, 141 insertions(+), 93 deletions(-) diff --git a/kubesurveyor/main.py b/kubesurveyor/main.py index e46e669..ff6bd7e 100644 --- a/kubesurveyor/main.py +++ b/kubesurveyor/main.py @@ -17,101 +17,148 @@ "version": __version__, # program version "ingress": {}, # fqdn to service mapping "service": {}, # service to deployment - 1 to 1 mapping - # "deployment": {}, # deployment to pod - 1 to N mapping - # "statefulset": {}, # statefulset to pod - 1 to N mapping - # "daemonset": {}, # daemonset to pod - 1 to N mapping + "deployment": {}, # deployment to pod - 1 to N mapping + "statefulset": {}, # statefulset to pod - 1 to N mapping + "daemonset": {}, # daemonset to pod - 1 to N mapping "pod": {}, # pod to container - 1 to N mapping } -# def get_deployments(client: Any, namespace: str) -> None: -# v1 = client.AppsV1Api() -# ret = v1.list_namespaced_deployment(namespace=namespace) -# for deployment in ret.items: -# deployment_name = deployment.metadata.name -# if "app" in deployment.spec.selector.match_labels: -# pod_name = deployment.spec.selector.match_labels["app"] -# elif "app.kubernetes.io/name" in deployment.spec.selector.match_labels: -# pod_name = deployment.spec.selector.match_labels["app.kubernetes.io/name"] -# else: -# continue -# containers = [] -# for container in deployment.spec.template.spec.containers: -# # collect container name and ports used -# ports = [] -# try: -# for port in container.ports: -# port = port.to_dict() -# p = {} -# # store name, and the actual port in case name is not used -# if "name" in port: -# if port["name"] is not None: -# p["name"] = port["name"] -# if "container_port" in port: -# p["port"] = port["container_port"] -# ports.append(p) -# except TypeError: -# # container does not have ports -# pass -# containers.append([container.name, ports]) -# # safe to a global variable -# if deployment_name not in ns["deployment"]: -# ns["deployment"][deployment_name] = {} -# if "pods" not in ns["deployment"][deployment_name]: -# ns["deployment"][deployment_name]["pods"] = {} -# if pod_name not in ns["deployment"][deployment_name]["pods"]: -# ns["deployment"][deployment_name]["pods"][pod_name] = {} -# if "containers" not in ns["deployment"][deployment_name]["pods"][pod_name]: -# ns["deployment"][deployment_name]["pods"][pod_name]["containers"] = {} -# for container in containers: -# ns["deployment"][deployment_name]["pods"][pod_name]["containers"][ -# container[0] -# ] = container[1] - - -# def get_statefulsets(client: Any, namespace: str) -> None: -# v1 = client.AppsV1Api() -# ret = v1.list_namespaced_stateful_set(namespace=namespace) -# for statefulset in ret.items: -# statefulset_name = statefulset.metadata.name -# if "app" in statefulset.spec.selector.match_labels: -# pod_name = statefulset.spec.selector.match_labels["app"] -# elif "app.kubernetes.io/name" in statefulset.spec.selector.match_labels: -# pod_name = statefulset.spec.selector.match_labels["app.kubernetes.io/name"] -# else: -# continue -# containers = [] -# for container in statefulset.spec.template.spec.containers: -# # collect container name and ports used -# ports = [] -# try: -# for port in container.ports: -# port = port.to_dict() -# p = {} -# # store name, and the actual port in case name is not used -# if "name" in port: -# if port["name"] is not None: -# p["name"] = port["name"] -# if "container_port" in port: -# p["port"] = port["container_port"] -# ports.append(p) -# except TypeError: -# # container does not have ports -# pass -# containers.append([container.name, ports]) -# # safe to a global variable -# if statefulset_name not in ns["statefulset"]: -# ns["statefulset"][statefulset_name] = {} -# if "pods" not in ns["statefulset"][statefulset_name]: -# ns["statefulset"][statefulset_name]["pods"] = {} -# if pod_name not in ns["statefulset"][statefulset_name]["pods"]: -# ns["statefulset"][statefulset_name]["pods"][pod_name] = {} -# if "containers" not in ns["statefulset"][statefulset_name]["pods"][pod_name]: -# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"] = {} -# for container in containers: -# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"][ -# container[0] -# ] = container[1] +def get_deployments(client: Any, namespace: str) -> None: + v1 = client.AppsV1Api() + ret = v1.list_namespaced_deployment(namespace=namespace) + for deployment in ret.items: + deployment_name = deployment.metadata.name + if "app" in deployment.spec.selector.match_labels: + pod_name = deployment.spec.selector.match_labels["app"] + elif "app.kubernetes.io/name" in deployment.spec.selector.match_labels: + pod_name = deployment.spec.selector.match_labels["app.kubernetes.io/name"] + else: + continue + containers = [] + for container in deployment.spec.template.spec.containers: + # collect container name and ports used + ports = [] + try: + for port in container.ports: + port = port.to_dict() + p = {} + # store name, and the actual port in case name is not used + if "name" in port: + if port["name"] is not None: + p["name"] = port["name"] + if "container_port" in port: + p["port"] = port["container_port"] + ports.append(p) + except TypeError: + # container does not have ports + pass + containers.append([container.name, ports]) + # safe to a global variable + if deployment_name not in ns["deployment"]: + ns["deployment"][deployment_name] = {} + if "pods" not in ns["deployment"][deployment_name]: + ns["deployment"][deployment_name]["pods"] = {} + if pod_name not in ns["deployment"][deployment_name]["pods"]: + ns["deployment"][deployment_name]["pods"][pod_name] = {} + if "containers" not in ns["deployment"][deployment_name]["pods"][pod_name]: + ns["deployment"][deployment_name]["pods"][pod_name]["containers"] = {} + for container in containers: + ns["deployment"][deployment_name]["pods"][pod_name]["containers"][ + container[0] + ] = container[1] + + +def get_statefulsets(client: Any, namespace: str) -> None: + v1 = client.AppsV1Api() + ret = v1.list_namespaced_stateful_set(namespace=namespace) + for statefulset in ret.items: + statefulset_name = statefulset.metadata.name + if "app" in statefulset.spec.selector.match_labels: + pod_name = statefulset.spec.selector.match_labels["app"] + elif "app.kubernetes.io/name" in statefulset.spec.selector.match_labels: + pod_name = statefulset.spec.selector.match_labels["app.kubernetes.io/name"] + else: + continue + containers = [] + for container in statefulset.spec.template.spec.containers: + # collect container name and ports used + ports = [] + try: + for port in container.ports: + port = port.to_dict() + p = {} + # store name, and the actual port in case name is not used + if "name" in port: + if port["name"] is not None: + p["name"] = port["name"] + if "container_port" in port: + p["port"] = port["container_port"] + ports.append(p) + except TypeError: + # container does not have ports + pass + containers.append([container.name, ports]) + # safe to a global variable + if statefulset_name not in ns["statefulset"]: + ns["statefulset"][statefulset_name] = {} + if "pods" not in ns["statefulset"][statefulset_name]: + ns["statefulset"][statefulset_name]["pods"] = {} + if pod_name not in ns["statefulset"][statefulset_name]["pods"]: + ns["statefulset"][statefulset_name]["pods"][pod_name] = {} + if "containers" not in ns["statefulset"][statefulset_name]["pods"][pod_name]: + ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"] = {} + for container in containers: + ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"][ + container[0] + ] = container[1] + + +def get_daemonsets(client: Any, namespace: str) -> None: + v1 = client.AppsV1Api() + ret = v1.list_namespaced_daemon_set(namespace=namespace) + for daemonset in ret.items: + daemonset_name = daemonset.metadata.name + if "name" in daemonset.spec.selector.match_labels: + pod_name = daemonset.spec.selector.match_labels["name"] + elif "app" in daemonset.spec.selector.match_labels: + pod_name = daemonset.spec.selector.match_labels["app"] + elif "app.kubernetes.io/name" in daemonset.spec.selector.match_labels: + pod_name = daemonset.spec.selector.match_labels["app.kubernetes.io/name"] + else: + continue + containers = [] + for container in daemonset.spec.template.spec.containers: + # collect container name and ports used + ports = [] + try: + for port in container.ports: + port = port.to_dict() + p = {} + # store name, and the actual port in case name is not used + if "name" in port: + if port["name"] is not None: + p["name"] = port["name"] + if "container_port" in port: + p["port"] = port["container_port"] + ports.append(p) + except TypeError: + # container does not have ports + pass + containers.append([container.name, ports]) + # safe to a global variable + if daemonset_name not in ns["daemonset"]: + ns["daemonset"][daemonset_name] = {} + if "pods" not in ns["daemonset"][daemonset_name]: + ns["daemonset"][daemonset_name]["pods"] = {} + if pod_name not in ns["daemonset"][daemonset_name]["pods"]: + ns["daemonset"][daemonset_name]["pods"][pod_name] = {} + if "containers" not in ns["daemonset"][daemonset_name]["pods"][pod_name]: + ns["daemonset"][daemonset_name]["pods"][pod_name]["containers"] = {} + for container in containers: + ns["daemonset"][daemonset_name]["pods"][pod_name]["containers"][ + container[0] + ] = container[1] def get_pods(client: Any, namespace: str) -> None: @@ -354,8 +401,9 @@ def main(args: Any) -> None: # we are not loading from a yaml file get_ingresses(client, namespace) get_services(client, namespace) - # get_deployments(client, namespace) - # get_statefulsets(client, namespace) + get_deployments(client, namespace) + get_statefulsets(client, namespace) + get_daemonsets(client, namespace) get_pods(client, namespace) else: # load yaml file loaded via stdin to a `ns` global var