Skip to content

Commit

Permalink
Change schema to be more readable by humans. Add port labels to pod c…
Browse files Browse the repository at this point in the history
…onnections.
  • Loading branch information
petergasper committed Sep 6, 2024
1 parent 5a361ea commit dcc3bfc
Showing 1 changed file with 120 additions and 22 deletions.
142 changes: 120 additions & 22 deletions kubesurveyor/main.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,119 @@
#!/usr/bin/env python3

__author__ = "Peter Gasper"
__version__ = "1.0.2"
__version__ = "2.0.0"
__license__ = "MIT"

import argparse
import sys
from typing import Any, Dict, TextIO

import yaml
from graphviz import Digraph
from kubernetes import client, config
import yaml # type: ignore
from graphviz import Digraph # type: ignore
from kubernetes import client, config # type: ignore

ns: Dict[str, Any] = {
"namespace": "",
"version": __version__, # program version
"ingress": {}, # fqdn to service mapping
"service": {}, # service to deployment - 1 to 1 mapping
# "deployment": {}, # deployment to pod - 1 to N mapping
# "statefulset": {}, # statefulset to pod - 1 to N mapping
# "daemonset": {}, # daemonset to pod - 1 to N mapping
"pod": {}, # pod to container - 1 to N mapping
}


# def get_deployments(client: Any, namespace: str) -> None:
# v1 = client.AppsV1Api()
# ret = v1.list_namespaced_deployment(namespace=namespace)
# for deployment in ret.items:
# deployment_name = deployment.metadata.name
# if "app" in deployment.spec.selector.match_labels:
# pod_name = deployment.spec.selector.match_labels["app"]
# elif "app.kubernetes.io/name" in deployment.spec.selector.match_labels:
# pod_name = deployment.spec.selector.match_labels["app.kubernetes.io/name"]
# else:
# continue
# containers = []
# for container in deployment.spec.template.spec.containers:
# # collect container name and ports used
# ports = []
# try:
# for port in container.ports:
# port = port.to_dict()
# p = {}
# # store name, and the actual port in case name is not used
# if "name" in port:
# if port["name"] is not None:
# p["name"] = port["name"]
# if "container_port" in port:
# p["port"] = port["container_port"]
# ports.append(p)
# except TypeError:
# # container does not have ports
# pass
# containers.append([container.name, ports])
# # safe to a global variable
# if deployment_name not in ns["deployment"]:
# ns["deployment"][deployment_name] = {}
# if "pods" not in ns["deployment"][deployment_name]:
# ns["deployment"][deployment_name]["pods"] = {}
# if pod_name not in ns["deployment"][deployment_name]["pods"]:
# ns["deployment"][deployment_name]["pods"][pod_name] = {}
# if "containers" not in ns["deployment"][deployment_name]["pods"][pod_name]:
# ns["deployment"][deployment_name]["pods"][pod_name]["containers"] = {}
# for container in containers:
# ns["deployment"][deployment_name]["pods"][pod_name]["containers"][
# container[0]
# ] = container[1]


# def get_statefulsets(client: Any, namespace: str) -> None:
# v1 = client.AppsV1Api()
# ret = v1.list_namespaced_stateful_set(namespace=namespace)
# for statefulset in ret.items:
# statefulset_name = statefulset.metadata.name
# if "app" in statefulset.spec.selector.match_labels:
# pod_name = statefulset.spec.selector.match_labels["app"]
# elif "app.kubernetes.io/name" in statefulset.spec.selector.match_labels:
# pod_name = statefulset.spec.selector.match_labels["app.kubernetes.io/name"]
# else:
# continue
# containers = []
# for container in statefulset.spec.template.spec.containers:
# # collect container name and ports used
# ports = []
# try:
# for port in container.ports:
# port = port.to_dict()
# p = {}
# # store name, and the actual port in case name is not used
# if "name" in port:
# if port["name"] is not None:
# p["name"] = port["name"]
# if "container_port" in port:
# p["port"] = port["container_port"]
# ports.append(p)
# except TypeError:
# # container does not have ports
# pass
# containers.append([container.name, ports])
# # safe to a global variable
# if statefulset_name not in ns["statefulset"]:
# ns["statefulset"][statefulset_name] = {}
# if "pods" not in ns["statefulset"][statefulset_name]:
# ns["statefulset"][statefulset_name]["pods"] = {}
# if pod_name not in ns["statefulset"][statefulset_name]["pods"]:
# ns["statefulset"][statefulset_name]["pods"][pod_name] = {}
# if "containers" not in ns["statefulset"][statefulset_name]["pods"][pod_name]:
# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"] = {}
# for container in containers:
# ns["statefulset"][statefulset_name]["pods"][pod_name]["containers"][
# container[0]
# ] = container[1]


def get_pods(client: Any, namespace: str) -> None:
# name, ports
v1 = client.CoreV1Api()
Expand All @@ -34,12 +127,14 @@ def get_pods(client: Any, namespace: str) -> None:
try:
for port in container.ports:
port = port.to_dict()
p = {}
# store name, and the actual port in case name is not used
if "name" in port:
if port["name"] is not None:
ports.append(port["name"])
p["name"] = port["name"]
if "container_port" in port:
ports.append(port["container_port"])
p["port"] = port["container_port"]
ports.append(p)
except TypeError:
# container does not have ports
pass
Expand All @@ -59,9 +154,11 @@ def get_pods(client: Any, namespace: str) -> None:
# safe to a global variable
if pod_name not in ns["pod"]:
ns["pod"][pod_name] = {}
if "containers" not in ns["pod"][pod_name]:
ns["pod"][pod_name]["containers"] = {}
for container in containers:
# { "<pod>": {"<container>": ["web", "8080", "<other port>"]}}
ns["pod"][pod_name][container[0]] = container[1]
ns["pod"][pod_name]["containers"][container[0]] = container[1]


def get_services(client: Any, namespace: str) -> None:
Expand Down Expand Up @@ -175,12 +272,12 @@ def visualize() -> Digraph:
)
existing_pods[pod] = str(index)
# get only container names, not the ports
container_names = list(containers.keys())
container_names = list(containers["containers"].keys())
# does any service refer this pod or its containers ?
for (
container_name,
container_ports,
) in containers.items():
) in containers["containers"].items():
for service_name in ns["service"].keys():
# we are using strict=true so duplicates does not concern us that much
dot_service.node(service_name)
Expand All @@ -193,13 +290,17 @@ def visualize() -> Digraph:
container_name + existing_pods[pod],
container_name,
)
container_port_vals = [
value for d in container_ports for value in d.values()
]
# FIXME, we are checking just the first service port
if ports[0] in container_ports:
if ports[0] in container_port_vals:
# lets create an edge between the service and a container
# do not forget to use unique pod index for the edge
dot_root.edge(
service_name,
container_name + existing_pods[pod],
label=str(ports[0]),
)
dot_pods.subgraph(dot_pod)
# push the subgraphs to the dot_root
Expand All @@ -220,13 +321,6 @@ def yaml_to_ns(input_file: TextIO) -> None:
ns = yaml.safe_load(input_file)


def get_all(client: Any, namespace: str) -> None:
# TODO check if namespace is available with the current context
get_services(client, namespace)
get_pods(client, namespace)
get_ingresses(client, namespace)


def main(args: Any) -> None:
# https://github.com/kubernetes-client/python/issues/1131#issuecomment-749452174
if args.context:
Expand Down Expand Up @@ -258,7 +352,11 @@ def main(args: Any) -> None:
# decide if we are loading from an actual cluster or existing yaml file
if not args.load:
# we are not loading from a yaml file
get_all(client, namespace)
get_ingresses(client, namespace)
get_services(client, namespace)
# get_deployments(client, namespace)
# get_statefulsets(client, namespace)
get_pods(client, namespace)
else:
# load yaml file loaded via stdin to a `ns` global var
input_file = sys.stdin
Expand All @@ -279,7 +377,7 @@ def main(args: Any) -> None:
viz_dot.render(filename=namespace)


def parse_args() -> None:
def parse_args() -> Any:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
Expand Down Expand Up @@ -357,9 +455,9 @@ def parse_args() -> None:
help="Do not verify cluster SSL certificate.",
)

args = parser.parse_args()
main(args)
return parser.parse_args()


if __name__ == "__main__":
parse_args()
args = parse_args()
main(args)

0 comments on commit dcc3bfc

Please sign in to comment.