Skip to content

Commit

Permalink
Add statefulset, daemonset, deployments collection.
Browse files Browse the repository at this point in the history
  • Loading branch information
petergasper committed Sep 6, 2024
1 parent dcc3bfc commit e7b7589
Showing 1 changed file with 141 additions and 93 deletions.
234 changes: 141 additions & 93 deletions kubesurveyor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,101 +17,148 @@
"version": __version__, # program version
"ingress": {}, # fqdn to service mapping
"service": {}, # service to deployment - 1 to 1 mapping
# "deployment": {}, # deployment to pod - 1 to N mapping
# "statefulset": {}, # statefulset to pod - 1 to N mapping
# "daemonset": {}, # daemonset to pod - 1 to N mapping
"deployment": {}, # deployment to pod - 1 to N mapping
"statefulset": {}, # statefulset to pod - 1 to N mapping
"daemonset": {}, # daemonset to pod - 1 to N mapping
"pod": {}, # pod to container - 1 to N mapping
}


# def get_deployments(client: Any, namespace: str) -> None:
# v1 = client.AppsV1Api()
# ret = v1.list_namespaced_deployment(namespace=namespace)
# for deployment in ret.items:
# deployment_name = deployment.metadata.name
# if "app" in deployment.spec.selector.match_labels:
# pod_name = deployment.spec.selector.match_labels["app"]
# elif "app.kubernetes.io/name" in deployment.spec.selector.match_labels:
# pod_name = deployment.spec.selector.match_labels["app.kubernetes.io/name"]
# else:
# continue
# containers = []
# for container in deployment.spec.template.spec.containers:
# # collect container name and ports used
# ports = []
# try:
# for port in container.ports:
# port = port.to_dict()
# p = {}
# # store name, and the actual port in case name is not used
# if "name" in port:
# if port["name"] is not None:
# p["name"] = port["name"]
# if "container_port" in port:
# p["port"] = port["container_port"]
# ports.append(p)
# except TypeError:
# # container does not have ports
# pass
# containers.append([container.name, ports])
# # safe to a global variable
# if deployment_name not in ns["deployment"]:
# ns["deployment"][deployment_name] = {}
# if "pods" not in ns["deployment"][deployment_name]:
# ns["deployment"][deployment_name]["pods"] = {}
# if pod_name not in ns["deployment"][deployment_name]["pods"]:
# ns["deployment"][deployment_name]["pods"][pod_name] = {}
# if "containers" not in ns["deployment"][deployment_name]["pods"][pod_name]:
# ns["deployment"][deployment_name]["pods"][pod_name]["containers"] = {}
# for container in containers:
# ns["deployment"][deployment_name]["pods"][pod_name]["containers"][
# container[0]
# ] = container[1]


# def get_statefulsets(client: Any, namespace: str) -> None:
# v1 = client.AppsV1Api()
# ret = v1.list_namespaced_stateful_set(namespace=namespace)
# for statefulset in ret.items:
# statefulset_name = statefulset.metadata.name
# if "app" in statefulset.spec.selector.match_labels:
# pod_name = statefulset.spec.selector.match_labels["app"]
# elif "app.kubernetes.io/name" in statefulset.spec.selector.match_labels:
# pod_name = statefulset.spec.selector.match_labels["app.kubernetes.io/name"]
# else:
# continue
# containers = []
# for container in statefulset.spec.template.spec.containers:
# # collect container name and ports used
# ports = []
# try:
# for port in container.ports:
# port = port.to_dict()
# p = {}
# # store name, and the actual port in case name is not used
# if "name" in port:
# if port["name"] is not None:
# p["name"] = port["name"]
# if "container_port" in port:
# p["port"] = port["container_port"]
# ports.append(p)
# except TypeError:
# # container does not have ports
# pass
# containers.append([container.name, ports])
# # safe to a global variable
# if statefulset_name not in ns["statefulset"]:
# ns["statefulset"][statefulset_name] = {}
# if "pods" not in ns["statefulset"][statefulset_name]:
# ns["statefulset"][statefulset_name]["pods"] = {}
# if pod_name not in ns["statefulset"][statefulset_name]["pods"]:
# ns["statefulset"][statefulset_name]["pods"][pod_name] = {}
# if "containers" not in ns["statefulset"][statefulset_name]["pods"][pod_name]:
# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"] = {}
# for container in containers:
# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"][
# container[0]
# ] = container[1]
def get_deployments(client: Any, namespace: str) -> None:
v1 = client.AppsV1Api()
ret = v1.list_namespaced_deployment(namespace=namespace)
for deployment in ret.items:
deployment_name = deployment.metadata.name
if "app" in deployment.spec.selector.match_labels:
pod_name = deployment.spec.selector.match_labels["app"]
elif "app.kubernetes.io/name" in deployment.spec.selector.match_labels:
pod_name = deployment.spec.selector.match_labels["app.kubernetes.io/name"]
else:
continue
containers = []
for container in deployment.spec.template.spec.containers:
# collect container name and ports used
ports = []
try:
for port in container.ports:
port = port.to_dict()
p = {}
# store name, and the actual port in case name is not used
if "name" in port:
if port["name"] is not None:
p["name"] = port["name"]
if "container_port" in port:
p["port"] = port["container_port"]
ports.append(p)
except TypeError:
# container does not have ports
pass
containers.append([container.name, ports])
# safe to a global variable
if deployment_name not in ns["deployment"]:
ns["deployment"][deployment_name] = {}
if "pods" not in ns["deployment"][deployment_name]:
ns["deployment"][deployment_name]["pods"] = {}
if pod_name not in ns["deployment"][deployment_name]["pods"]:
ns["deployment"][deployment_name]["pods"][pod_name] = {}
if "containers" not in ns["deployment"][deployment_name]["pods"][pod_name]:
ns["deployment"][deployment_name]["pods"][pod_name]["containers"] = {}
for container in containers:
ns["deployment"][deployment_name]["pods"][pod_name]["containers"][
container[0]
] = container[1]


def get_statefulsets(client: Any, namespace: str) -> None:
v1 = client.AppsV1Api()
ret = v1.list_namespaced_stateful_set(namespace=namespace)
for statefulset in ret.items:
statefulset_name = statefulset.metadata.name
if "app" in statefulset.spec.selector.match_labels:
pod_name = statefulset.spec.selector.match_labels["app"]
elif "app.kubernetes.io/name" in statefulset.spec.selector.match_labels:
pod_name = statefulset.spec.selector.match_labels["app.kubernetes.io/name"]
else:
continue
containers = []
for container in statefulset.spec.template.spec.containers:
# collect container name and ports used
ports = []
try:
for port in container.ports:
port = port.to_dict()
p = {}
# store name, and the actual port in case name is not used
if "name" in port:
if port["name"] is not None:
p["name"] = port["name"]
if "container_port" in port:
p["port"] = port["container_port"]
ports.append(p)
except TypeError:
# container does not have ports
pass
containers.append([container.name, ports])
# safe to a global variable
if statefulset_name not in ns["statefulset"]:
ns["statefulset"][statefulset_name] = {}
if "pods" not in ns["statefulset"][statefulset_name]:
ns["statefulset"][statefulset_name]["pods"] = {}
if pod_name not in ns["statefulset"][statefulset_name]["pods"]:
ns["statefulset"][statefulset_name]["pods"][pod_name] = {}
if "containers" not in ns["statefulset"][statefulset_name]["pods"][pod_name]:
ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"] = {}
for container in containers:
ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"][
container[0]
] = container[1]


def get_daemonsets(client: Any, namespace: str) -> None:
v1 = client.AppsV1Api()
ret = v1.list_namespaced_daemon_set(namespace=namespace)
for daemonset in ret.items:
daemonset_name = daemonset.metadata.name
if "name" in daemonset.spec.selector.match_labels:
pod_name = daemonset.spec.selector.match_labels["name"]
elif "app" in daemonset.spec.selector.match_labels:
pod_name = daemonset.spec.selector.match_labels["app"]
elif "app.kubernetes.io/name" in daemonset.spec.selector.match_labels:
pod_name = daemonset.spec.selector.match_labels["app.kubernetes.io/name"]
else:
continue
containers = []
for container in daemonset.spec.template.spec.containers:
# collect container name and ports used
ports = []
try:
for port in container.ports:
port = port.to_dict()
p = {}
# store name, and the actual port in case name is not used
if "name" in port:
if port["name"] is not None:
p["name"] = port["name"]
if "container_port" in port:
p["port"] = port["container_port"]
ports.append(p)
except TypeError:
# container does not have ports
pass
containers.append([container.name, ports])
# safe to a global variable
if daemonset_name not in ns["daemonset"]:
ns["daemonset"][daemonset_name] = {}
if "pods" not in ns["daemonset"][daemonset_name]:
ns["daemonset"][daemonset_name]["pods"] = {}
if pod_name not in ns["daemonset"][daemonset_name]["pods"]:
ns["daemonset"][daemonset_name]["pods"][pod_name] = {}
if "containers" not in ns["daemonset"][daemonset_name]["pods"][pod_name]:
ns["daemonset"][daemonset_name]["pods"][pod_name]["containers"] = {}
for container in containers:
ns["daemonset"][daemonset_name]["pods"][pod_name]["containers"][
container[0]
] = container[1]


def get_pods(client: Any, namespace: str) -> None:
Expand Down Expand Up @@ -354,8 +401,9 @@ def main(args: Any) -> None:
# we are not loading from a yaml file
get_ingresses(client, namespace)
get_services(client, namespace)
# get_deployments(client, namespace)
# get_statefulsets(client, namespace)
get_deployments(client, namespace)
get_statefulsets(client, namespace)
get_daemonsets(client, namespace)
get_pods(client, namespace)
else:
# load yaml file loaded via stdin to a `ns` global var
Expand Down

0 comments on commit e7b7589

Please sign in to comment.