diff --git a/network/benchmarks/netperf/.gitignore b/network/benchmarks/netperf/.gitignore index 62c19394ab..b067f9b902 100644 --- a/network/benchmarks/netperf/.gitignore +++ b/network/benchmarks/netperf/.gitignore @@ -2,6 +2,7 @@ Dockerbuild/* Dockerbuildclient/* nptests netperf-w?.yaml +result*/*.json *.csv *.jpg *.png @@ -11,4 +12,6 @@ netperf-w?.yaml *.pyc .vscode data-* - +kubeConfig +launch +AzurePublicCloud-*.json diff --git a/network/benchmarks/netperf/Makefile b/network/benchmarks/netperf/Makefile index 8ac8bd4c5f..7e1456dd0f 100644 --- a/network/benchmarks/netperf/Makefile +++ b/network/benchmarks/netperf/Makefile @@ -14,18 +14,17 @@ all: docker push launch runtests -DOCKERREPO := girishkalele/netperf-latest +repo_owner := $(shell echo $(REPO_OWNER) | tr '[:upper:]' '[:lower:]') +dockerrepo := $(if $(repo_owner),ghcr.io/$(repo_owner)/nptest,girishkalele/netperf-latest) +image_tag := $(or $(IMAGE_TAG), latest) -docker: launch - mkdir -p Dockerbuild/nptest && \ - cp -f Dockerfile Dockerbuild/ && \ - cp -f nptest/nptest.go Dockerbuild/nptest/ && \ - cp -f go.mod Dockerbuild/ && \ - cp -f go.sum Dockerbuild/ && \ - docker build -t $(DOCKERREPO) Dockerbuild/ +docker: test + mkdir -p Dockerbuild && \ + cp -rf nptest/* Dockerbuild/ && \ + docker build -t $(dockerrepo):$(image_tag) Dockerbuild/ push: docker - gcloud docker push $(DOCKERREPO) + docker push $(dockerrepo):$(image_tag) clean: @rm -f Dockerbuild/* @@ -36,6 +35,10 @@ clean: launch: launch.go go build -o launch launch.go +test: + go test ./... + cd nptest && go test ./... + # 'runtests' is the test runner target runtests: launch @echo Launching network performance tests @@ -44,4 +47,6 @@ runtests: launch cp netperf-latest.csv plotperf && cd plotperf; make plot && mv *png .. && mv *svg .. @echo Results file netperf-latest.csv and SVG/PNG graphs generated successfully +localtest: push + go run launch.go -image=$(dockerrepo):$(image_tag) -json -kubeConfig ./kubeConfig diff --git a/network/benchmarks/netperf/go.mod b/network/benchmarks/netperf/go.mod index 68f86338b5..6db92a7020 100644 --- a/network/benchmarks/netperf/go.mod +++ b/network/benchmarks/netperf/go.mod @@ -1,4 +1,4 @@ -module k8s.io/perf-tests/network +module k8s.io/perf-tests/network/benchmarks/netperf go 1.23.4 diff --git a/network/benchmarks/netperf/launch.go b/network/benchmarks/netperf/launch.go index 1d71654941..971abbd358 100644 --- a/network/benchmarks/netperf/launch.go +++ b/network/benchmarks/netperf/launch.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2025 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -27,54 +27,43 @@ limitations under the License. package main import ( - "context" "flag" "fmt" "os" - "strings" - "time" - api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" + "k8s.io/perf-tests/network/benchmarks/netperf/lib" ) const ( - csvDataMarker = "GENERATING CSV OUTPUT" - csvEndDataMarker = "END CSV DATA" - runUUID = "latest" - orchestratorPort = 5202 - iperf3Port = 5201 - qperf19766 = 19766 - qperf19765 = 19765 - netperfPort = 12865 + csvDataMarker = "GENERATING CSV OUTPUT" + csvEndDataMarker = "END CSV DATA" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" + runUUID = "latest" + orchestratorPort = 5202 + iperf3Port = 5201 + qperf19766 = 19766 + qperf19765 = 19765 + netperfPort = 12865 ) var ( - iterations int - hostnetworking bool - tag string - kubeConfig string - testNamespace string - netperfImage string - cleanupOnly bool - - everythingSelector metav1.ListOptions = metav1.ListOptions{} - - primaryNode api.Node - secondaryNode api.Node + iterations int + tag string + kubeConfig string + testNamespace string + netperfImage string + cleanupOnly bool testFrom, testTo int + + jsonOutput bool ) func init() { - flag.BoolVar(&hostnetworking, "hostnetworking", false, - "(boolean) Enable Host Networking Mode for PODs") flag.IntVar(&iterations, "iterations", 1, "Number of iterations to run") - flag.StringVar(&tag, "tag", runUUID, "CSV file suffix") + flag.StringVar(&tag, "tag", runUUID, "Result file suffix") flag.StringVar(&netperfImage, "image", "sirot/netperf-latest", "Docker image used to run the network tests") flag.StringVar(&testNamespace, "namespace", "netperf", "Test namespace to run netperf pods") defaultKubeConfig := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) @@ -84,361 +73,7 @@ func init() { "(boolean) Run the cleanup resources phase only (use this flag to clean up orphaned resources from a test run)") flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom") flag.IntVar(&testTo, "testTo", 5, "end at test number testTo") -} - -func setupClient() *kubernetes.Clientset { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - panic(err) - } - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - panic(err) - } - - return clientset -} - -// getMinions : Only return schedulable/worker nodes -func getMinionNodes(ctx context.Context, c *kubernetes.Clientset) *api.NodeList { - nodes, err := c.CoreV1().Nodes().List(ctx, - metav1.ListOptions{ - FieldSelector: "spec.unschedulable=false", - }) - if err != nil { - fmt.Println("Failed to fetch nodes", err) - return nil - } - return nodes -} - -func cleanup(ctx context.Context, c *kubernetes.Clientset) { - // Cleanup existing rcs, pods and services in our namespace - rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get replication controllers", err) - return - } - for _, rc := range rcs.Items { - fmt.Println("Deleting rc", rc.GetName()) - if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( - ctx, rc.GetName(), metav1.DeleteOptions{}); err != nil { - fmt.Println("Failed to delete rc", rc.GetName(), err) - } - } - pods, err := c.CoreV1().Pods(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get pods", err) - return - } - for _, pod := range pods.Items { - fmt.Println("Deleting pod", pod.GetName()) - if err := c.CoreV1().Pods(testNamespace).Delete(ctx, pod.GetName(), metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { - fmt.Println("Failed to delete pod", pod.GetName(), err) - } - } - svcs, err := c.CoreV1().Services(testNamespace).List(ctx, everythingSelector) - if err != nil { - fmt.Println("Failed to get services", err) - return - } - for _, svc := range svcs.Items { - fmt.Println("Deleting svc", svc.GetName()) - err := c.CoreV1().Services(testNamespace).Delete( - ctx, svc.GetName(), metav1.DeleteOptions{}) - if err != nil { - fmt.Println("Failed to get service", err) - } - } -} - -// createServices: Long-winded function to programmatically create our two services -func createServices(ctx context.Context, c *kubernetes.Clientset) bool { - // Create our namespace if not present - if _, err := c.CoreV1().Namespaces().Get(ctx, testNamespace, metav1.GetOptions{}); err != nil { - _, err := c.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Failed to create service", err) - } - } - - // Create the orchestrator service that points to the coordinator pod - orchLabels := map[string]string{"app": "netperf-orch"} - orchService := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-orch", - }, - Spec: api.ServiceSpec{ - Selector: orchLabels, - Ports: []api.ServicePort{{ - Name: "netperf-orch", - Protocol: api.ProtocolTCP, - Port: orchestratorPort, - TargetPort: intstr.FromInt(orchestratorPort), - }}, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(ctx, orchService, metav1.CreateOptions{}); err != nil { - fmt.Println("Failed to create orchestrator service", err) - return false - } - fmt.Println("Created orchestrator service") - - // Create the netperf-w2 service that points a clusterIP at the worker 2 pod - netperfW2Labels := map[string]string{"app": "netperf-w2"} - netperfW2Service := &api.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "netperf-w2", - }, - Spec: api.ServiceSpec{ - Selector: netperfW2Labels, - Ports: []api.ServicePort{ - { - Name: "netperf-w2", - Protocol: api.ProtocolTCP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-qperf19766", - Protocol: api.ProtocolTCP, - Port: qperf19766, - TargetPort: intstr.FromInt(qperf19766), - }, - { - Name: "netperf-w2-qperf19765", - Protocol: api.ProtocolTCP, - Port: qperf19765, - TargetPort: intstr.FromInt(qperf19765), - }, - { - Name: "netperf-w2-sctp", - Protocol: api.ProtocolSCTP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-udp", - Protocol: api.ProtocolUDP, - Port: iperf3Port, - TargetPort: intstr.FromInt(iperf3Port), - }, - { - Name: "netperf-w2-netperf", - Protocol: api.ProtocolTCP, - Port: netperfPort, - TargetPort: intstr.FromInt(netperfPort), - }, - }, - Type: api.ServiceTypeClusterIP, - }, - } - if _, err := c.CoreV1().Services(testNamespace).Create(ctx, netperfW2Service, metav1.CreateOptions{}); err != nil { - fmt.Println("Failed to create netperf-w2 service", err) - return false - } - fmt.Println("Created netperf-w2 service") - return true -} - -// createRCs - Create replication controllers for all workers and the orchestrator -func createRCs(ctx context.Context, c *kubernetes.Clientset) bool { - // Create the orchestrator RC - name := "netperf-orch" - fmt.Println("Creating replication controller", name) - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(ctx, &api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, - Args: []string{ - "--mode=orchestrator", - fmt.Sprintf("--testFrom=%d", testFrom), - fmt.Sprintf("--testTo=%d", testTo), - }, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", err) - return false - } - fmt.Println("Created orchestrator replication controller") - for i := 1; i <= 3; i++ { - // Bring up pods slowly - time.Sleep(3 * time.Second) - kubeNode := primaryNode.GetName() - if i == 3 { - kubeNode = secondaryNode.GetName() - } - name = fmt.Sprintf("netperf-w%d", i) - fmt.Println("Creating replication controller", name) - portSpec := []api.ContainerPort{} - if i > 1 { - // Worker W1 is a client-only pod - no ports are exposed - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) - portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) - } - - workerEnv := []api.EnvVar{ - {Name: "worker", Value: name}, - {Name: "kubeNode", Value: kubeNode}, - {Name: "podname", Value: name}, - } - - replicas := int32(1) - - _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(ctx, &api.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: api.ReplicationControllerSpec{ - Replicas: &replicas, - Selector: map[string]string{"app": name}, - Template: &api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": name}, - }, - Spec: api.PodSpec{ - NodeName: kubeNode, - Containers: []api.Container{ - { - Name: name, - Image: netperfImage, - Ports: portSpec, - Args: []string{"--mode=worker"}, - Env: workerEnv, - ImagePullPolicy: "Always", - }, - }, - TerminationGracePeriodSeconds: new(int64), - }, - }, - }, - }, metav1.CreateOptions{}) - if err != nil { - fmt.Println("Error creating orchestrator replication controller", name, ":", err) - return false - } - } - - return true -} - -func getOrchestratorPodName(pods *api.PodList) string { - for _, pod := range pods.Items { - if strings.Contains(pod.GetName(), "netperf-orch-") { - return pod.GetName() - } - } - return "" -} - -// Retrieve the logs for the pod/container and check if csv data has been generated -func getCsvResultsFromPod(ctx context.Context, c *kubernetes.Clientset, podName string) *string { - body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{Timestamps: false}).DoRaw(ctx) - if err != nil { - fmt.Printf("Error (%s) reading logs from pod %s", err, podName) - return nil - } - logData := string(body) - index := strings.Index(logData, csvDataMarker) - endIndex := strings.Index(logData, csvEndDataMarker) - if index == -1 || endIndex == -1 { - return nil - } - csvData := string(body[index+len(csvDataMarker)+1 : endIndex]) - return &csvData -} - -// processCsvData : Process the CSV datafile and generate line and bar graphs -func processCsvData(csvData *string) bool { - t := time.Now().UTC() - outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) - outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) - fmt.Printf("Test concluded - CSV raw data written to %s/%scsv\n", outputFileDirectory, outputFilePrefix) - if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { - err := os.Mkdir(outputFileDirectory, 0766) - if err != nil { - fmt.Println("Error creating directory", err) - return false - } - - } - fd, err := os.OpenFile(fmt.Sprintf("%s/%scsv", outputFileDirectory, outputFilePrefix), os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - fmt.Println("ERROR writing output CSV datafile", err) - return false - } - _, err = fd.WriteString(*csvData) - if err != nil { - fmt.Println("Error writing string", err) - return false - } - fd.Close() - return true -} - -func executeTests(ctx context.Context, c *kubernetes.Clientset) bool { - for i := 0; i < iterations; i++ { - cleanup(ctx, c) - if !createServices(ctx, c) { - fmt.Println("Failed to create services - aborting test") - return false - } - time.Sleep(3 * time.Second) - if !createRCs(ctx, c) { - fmt.Println("Failed to create replication controllers - aborting test") - return false - } - fmt.Println("Waiting for netperf pods to start up") - - var orchestratorPodName string - for len(orchestratorPodName) == 0 { - fmt.Println("Waiting for orchestrator pod creation") - time.Sleep(60 * time.Second) - var pods *api.PodList - var err error - if pods, err = c.CoreV1().Pods(testNamespace).List(ctx, everythingSelector); err != nil { - fmt.Println("Failed to fetch pods - waiting for pod creation", err) - continue - } - orchestratorPodName = getOrchestratorPodName(pods) - } - fmt.Println("Orchestrator Pod is", orchestratorPodName) - - // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container - for { - // Monitor the orchestrator pod for the CSV results file - csvdata := getCsvResultsFromPod(ctx, c, orchestratorPodName) - if csvdata == nil { - fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...waiting for orchestrator to write CSV file...") - time.Sleep(60 * time.Second) - continue - } - if processCsvData(csvdata) { - break - } - } - fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) - } - return false + flag.BoolVar(&jsonOutput, "json", false, "Output JSON data along with CSV data") } func main() { @@ -446,33 +81,29 @@ func main() { fmt.Println("Network Performance Test") fmt.Println("Parameters :") fmt.Println("Iterations : ", iterations) - fmt.Println("Host Networking : ", hostnetworking) fmt.Println("Test Namespace : ", testNamespace) fmt.Println("Docker image : ", netperfImage) fmt.Println("------------------------------------------------------------") - ctx := context.Background() - - var c *kubernetes.Clientset - if c = setupClient(); c == nil { - fmt.Println("Failed to setup REST client to Kubernetes cluster") - return - } - if cleanupOnly { - cleanup(ctx, c) - return - } - nodes := getMinionNodes(ctx, c) - if nodes == nil { - return + testParams := lib.TestParams{ + Iterations: iterations, + Tag: tag, + TestNamespace: testNamespace, + Image: netperfImage, + CleanupOnly: cleanupOnly, + TestFrom: testFrom, + TestTo: testTo, + JSONOutput: jsonOutput, + KubeConfig: kubeConfig, + } + results, err := lib.PerformTests(testParams) + if err != nil { + fmt.Println(err) + os.Exit(1) } - if len(nodes.Items) < 2 { - fmt.Println("Insufficient number of nodes for test (need minimum 2 nodes)") - return + fmt.Println("Results :") + for _, result := range results { + fmt.Println("CSV Result File : ", result.CsvResultFile) + fmt.Println("JSON Result File : ", result.JSONResultFile) } - primaryNode = nodes.Items[0] - secondaryNode = nodes.Items[1] - fmt.Printf("Selected primary,secondary nodes = (%s, %s)\n", primaryNode.GetName(), secondaryNode.GetName()) - executeTests(ctx, c) - cleanup(ctx, c) } diff --git a/network/benchmarks/netperf/lib/outputlib.go b/network/benchmarks/netperf/lib/outputlib.go new file mode 100644 index 0000000000..d613ae2b5e --- /dev/null +++ b/network/benchmarks/netperf/lib/outputlib.go @@ -0,0 +1,98 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" +) + +func getLogsFromPod(c *kubernetes.Clientset, podName, testNamespace string) (*string, error) { + var logData *string + + // Retry to get logs from the pod, as we are polling at intervals + // and there might be intermittent network issues, a long retry time + // is acceptable. + err := retry.OnError(wait.Backoff{ + Steps: 5, + Duration: 2 * time.Second, + Factor: 2.0, + Jitter: 100, + }, func(_ error) bool { + return true + }, func() error { + body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{}).DoRaw(context.Background()) + if err != nil { + return err + } + data := string(body) + logData = &data + return nil + }) + + if err != nil { + return nil, fmt.Errorf("error reading logs from pod %s: %v", podName, err) + } + + return logData, nil +} + +func getDataFromPod(c *kubernetes.Clientset, podName, startMarker, endMarker, testNamespace string) (*string, error) { + logData, err := getLogsFromPod(c, podName, testNamespace) + if err != nil { + return nil, err + } + index := strings.Index(*logData, startMarker) + endIndex := strings.Index(*logData, endMarker) + if index == -1 || endIndex == -1 { + return nil, nil + } + data := string((*logData)[index+len(startMarker)+1 : endIndex]) + return &data, nil +} + +func processRawData(rawData *string, testNamespace, tag, fileExtension string) (string, error) { + t := time.Now().UTC() + outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) + outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) + outputFilePath := fmt.Sprintf("%s/%s%s", outputFileDirectory, outputFilePrefix, fileExtension) + fmt.Printf("Test concluded - Raw data written to %s\n", outputFilePath) + if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { + err := os.Mkdir(outputFileDirectory, 0766) + if err != nil { + return "", err + } + } + fd, err := os.OpenFile(outputFilePath, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return "", fmt.Errorf("ERROR writing output datafile: %s", err) + } + defer fd.Close() + _, err = fd.WriteString(*rawData) + if err != nil { + return "", fmt.Errorf("error writing string: %s", err) + } + return outputFilePath, nil +} diff --git a/network/benchmarks/netperf/lib/testlib.go b/network/benchmarks/netperf/lib/testlib.go new file mode 100644 index 0000000000..2ded0afbb9 --- /dev/null +++ b/network/benchmarks/netperf/lib/testlib.go @@ -0,0 +1,89 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "fmt" +) + +const ( + csvDataMarker = "GENERATING CSV OUTPUT" + csvEndDataMarker = "END CSV DATA" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" + runUUID = "latest" + orchestratorPort = 5202 + iperf3Port = 5201 + qperf19766 = 19766 + qperf19765 = 19765 + netperfPort = 12865 +) + +type TestParams struct { + Iterations int + Tag string + TestNamespace string + Image string + CleanupOnly bool + TestFrom int + TestTo int + JSONOutput bool + KubeConfig string +} + +type Result struct { + JSONResultFile string + CsvResultFile string +} + +func PerformTests(testParams TestParams) ([]Result, error) { + c, err := setupClient(testParams.KubeConfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %v", err) + } + nodes, err := getMinionNodes(c) + if err != nil { + return nil, fmt.Errorf("failed to get nodes: %v", err) + } + if len(nodes.Items) < 2 { + return nil, fmt.Errorf("at least 2 nodes are required to run the tests") + } + primaryNode := nodes.Items[0] + secondaryNode := nodes.Items[1] + + fmt.Println("Primary Node : ", primaryNode.Name) + fmt.Println("Secondary Node : ", secondaryNode.Name) + + if testParams.CleanupOnly { + cleanup(c, testParams.TestNamespace) + return nil, nil + } + + fmt.Println("Network Performance Test") + fmt.Println("Parameters :") + fmt.Println("Iterations : ", testParams.Iterations) + fmt.Println("Test Namespace : ", testParams.TestNamespace) + fmt.Println("Docker image : ", testParams.Image) + fmt.Println("------------------------------------------------------------") + + results, err := executeTests(c, testParams, primaryNode, secondaryNode) + if err != nil { + return nil, fmt.Errorf("failed to execute tests: %v", err) + } + cleanup(c, testParams.TestNamespace) + return results, nil +} diff --git a/network/benchmarks/netperf/lib/utilslib.go b/network/benchmarks/netperf/lib/utilslib.go new file mode 100644 index 0000000000..9969d0c8c5 --- /dev/null +++ b/network/benchmarks/netperf/lib/utilslib.go @@ -0,0 +1,394 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "fmt" + "time" + + api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +var everythingSelector metav1.ListOptions = metav1.ListOptions{} + +func setupClient(kubeConfig string) (*kubernetes.Clientset, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return nil, fmt.Errorf("failed to create config: %v", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %v", err) + } + + return clientset, nil +} + +func getMinionNodes(c *kubernetes.Clientset) (*api.NodeList, error) { + nodes, err := c.CoreV1().Nodes().List( + context.Background(), + metav1.ListOptions{ + FieldSelector: "spec.unschedulable=false", + // for now the tests can only run on linux/amd64 nodes + LabelSelector: "kubernetes.io/os=linux,kubernetes.io/arch=amd64", + }) + if err != nil { + return nil, fmt.Errorf("failed to get nodes: %v", err) + } + return nodes, nil +} + +func createServices(c *kubernetes.Clientset, testNamespace string) error { + if _, err := c.CoreV1().Namespaces().Get(context.Background(), testNamespace, metav1.GetOptions{}); err != nil { + _, err := c.CoreV1().Namespaces().Create(context.Background(), &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create namespace %s: %v", testNamespace, err) + } + } + + // Create the orchestrator service that points to the coordinator pod + orchLabels := map[string]string{"app": "netperf-orch"} + orchService := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-orch", + }, + Spec: api.ServiceSpec{ + Selector: orchLabels, + Ports: []api.ServicePort{{ + Name: "netperf-orch", + Protocol: api.ProtocolTCP, + Port: orchestratorPort, + TargetPort: intstr.FromInt(orchestratorPort), + }}, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), orchService, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create orchestrator service: %v", err) + } + fmt.Println("Created orchestrator service") + + // Create the netperf-w2 service that points a clusterIP at the worker 2 pod + netperfW2Labels := map[string]string{"app": "netperf-w2"} + netperfW2Service := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-w2", + }, + Spec: api.ServiceSpec{ + Selector: netperfW2Labels, + Ports: []api.ServicePort{ + { + Name: "netperf-w2", + Protocol: api.ProtocolTCP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-qperf19766", + Protocol: api.ProtocolTCP, + Port: qperf19766, + TargetPort: intstr.FromInt(qperf19766), + }, + { + Name: "netperf-w2-qperf19765", + Protocol: api.ProtocolTCP, + Port: qperf19765, + TargetPort: intstr.FromInt(qperf19765), + }, + { + Name: "netperf-w2-sctp", + Protocol: api.ProtocolSCTP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-udp", + Protocol: api.ProtocolUDP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-netperf", + Protocol: api.ProtocolTCP, + Port: netperfPort, + TargetPort: intstr.FromInt(netperfPort), + }, + }, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), netperfW2Service, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create netperf-w2 service: %v", err) + } + fmt.Println("Created netperf-w2 service") + return nil +} + +func createRCs(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) error { + // Create the orchestrator RC + name := "netperf-orch" + fmt.Println("Creating replication controller", name) + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + NodeSelector: map[string]string{"kubernetes.io/os": "linux", "kubernetes.io/arch": "amd64"}, + Containers: []api.Container{ + { + Name: name, + Image: testParams.Image, + Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, + Args: []string{ + "--mode=orchestrator", + fmt.Sprintf("--testFrom=%d", testParams.TestFrom), + fmt.Sprintf("--testTo=%d", testParams.TestTo), + }, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error creating orchestrator replication controller %s: %v", name, err) + } + fmt.Println("Created orchestrator replication controller") + for i := 1; i <= 3; i++ { + // Bring up pods slowly + time.Sleep(3 * time.Second) + kubeNode := primaryNode.GetName() + if i == 3 { + kubeNode = secondaryNode.GetName() + } + name = fmt.Sprintf("netperf-w%d", i) + fmt.Println("Creating replication controller", name) + portSpec := []api.ContainerPort{} + if i > 1 { + // Worker W1 is a client-only pod - no ports are exposed + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) + } + + workerEnv := []api.EnvVar{ + {Name: "worker", Value: name}, + {Name: "kubeNode", Value: kubeNode}, + {Name: "podname", Value: name}, + } + + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + NodeName: kubeNode, + Containers: []api.Container{ + { + Name: name, + Image: testParams.Image, + Ports: portSpec, + Args: []string{ + "--mode=worker", + fmt.Sprintf("--testFrom=%d", testParams.TestFrom), + fmt.Sprintf("--testTo=%d", testParams.TestTo), + }, + Env: workerEnv, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error creating worker replication controller %s: %v", name, err) + } + } + + return nil +} + +func executeTests(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) ([]Result, error) { + results := make([]Result, testParams.Iterations) + for i := 0; i < testParams.Iterations; i++ { + cleanup(c, testParams.TestNamespace) + if err := createServices(c, testParams.TestNamespace); err != nil { + return nil, fmt.Errorf("failed to create services: %v", err) + } + time.Sleep(3 * time.Second) + if err := createRCs(c, testParams, primaryNode, secondaryNode); err != nil { + return nil, fmt.Errorf("failed to create replication controllers: %v", err) + } + fmt.Println("Waiting for netperf pods to start up") + + orchestratorPodName, err := getOrchestratorPodName(c, testParams.TestNamespace, 3*time.Minute) + if err != nil { + return nil, fmt.Errorf("failed to get orchestrator pod name: %v", err) + } + fmt.Println("Orchestrator Pod is", orchestratorPodName) + + var jsonFilePath string + var csvFilePath string + + // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container + for { + // Monitor the orchestrator pod for the CSV results file + csvdata, err := getDataFromPod(c, orchestratorPodName, csvDataMarker, csvEndDataMarker, testParams.TestNamespace) + if err != nil { + return nil, fmt.Errorf("error getting CSV data from orchestrator pod: %v", err) + } + if csvdata == nil { + fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...") + time.Sleep(60 * time.Second) + continue + } + + if testParams.JSONOutput { + jsondata, err := getDataFromPod(c, orchestratorPodName, jsonDataMarker, jsonEndDataMarker, testParams.TestNamespace) + if err != nil { + return nil, fmt.Errorf("error getting JSON data from orchestrator pod: %v", err) + } + if jsondata == nil { + fmt.Println("Scanned orchestrator pod filesystem - no json data found yet...") + time.Sleep(60 * time.Second) + continue + } + jsonFilePath, err = processRawData(jsondata, testParams.TestNamespace, testParams.Tag, "json") + if err != nil { + return nil, fmt.Errorf("error processing JSON data: %v", err) + } + } + + csvFilePath, err = processRawData(csvdata, testParams.TestNamespace, testParams.Tag, "csv") + if err != nil { + return nil, fmt.Errorf("error processing CSV data: %v", err) + } + + break + } + fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) + results[i] = Result{JSONResultFile: jsonFilePath, CsvResultFile: csvFilePath} + } + return results, nil +} + +func getOrchestratorPodName(c *kubernetes.Clientset, testNamespace string, timeout time.Duration) (string, error) { + timeoutCh := time.After(timeout) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fmt.Println("Waiting for orchestrator pod creation") + pods, err := c.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=netperf-orch", + }) + if err != nil { + fmt.Println("Failed to fetch pods - waiting for pod creation", err) + continue + } + if len(pods.Items) == 0 { + fmt.Println("No orchestrator pods found yet") + continue + } + + pod := pods.Items[0] + podStatus := pod.Status + + if podStatus.Phase == api.PodRunning { + return pod.GetName(), nil + } + + for _, containerStatus := range podStatus.ContainerStatuses { + if waiting := containerStatus.State.Waiting; waiting != nil { + switch waiting.Reason { + case "ErrImagePull", "CrashLoopBackOff", "ImagePullBackOff": + return "", fmt.Errorf("orchestrator pod error: %s - %v", waiting.Reason, waiting.Message) + } + } + } + fmt.Println("Orchestrator pod is not running yet") + case <-timeoutCh: + return "", fmt.Errorf("timed out waiting for orchestrator pod to be created") + } + } +} + +func cleanup(c *kubernetes.Clientset, testNamespace string) { + syncCtx := context.Background() + // Cleanup existing rcs, pods and services in our namespace + rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get replication controllers", err) + return + } + for _, rc := range rcs.Items { + fmt.Println("Deleting rc", rc.GetName()) + if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( + context.Background(), + rc.GetName(), metav1.DeleteOptions{}); err != nil { + fmt.Println("Failed to delete rc", rc.GetName(), err) + } + } + pods, err := c.CoreV1().Pods(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get pods", err) + return + } + for _, pod := range pods.Items { + fmt.Println("Deleting pod", pod.GetName()) + if err := c.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.GetName(), metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { + fmt.Println("Failed to delete pod", pod.GetName(), err) + } + } + svcs, err := c.CoreV1().Services(testNamespace).List(syncCtx, everythingSelector) + if err != nil { + fmt.Println("Failed to get services", err) + return + } + for _, svc := range svcs.Items { + fmt.Println("Deleting svc", svc.GetName()) + err := c.CoreV1().Services(testNamespace).Delete( + context.Background(), svc.GetName(), metav1.DeleteOptions{}) + if err != nil { + fmt.Println("Failed to get service", err) + } + } +} diff --git a/network/benchmarks/netperf/Dockerfile b/network/benchmarks/netperf/nptest/Dockerfile similarity index 87% rename from network/benchmarks/netperf/Dockerfile rename to network/benchmarks/netperf/nptest/Dockerfile index 37777b3bd1..f9b539fdb5 100644 --- a/network/benchmarks/netperf/Dockerfile +++ b/network/benchmarks/netperf/nptest/Dockerfile @@ -21,20 +21,18 @@ # # Args: --mode=worker --host= --port=5202 # -ARG GOLANG_VERSION=1.18 -FROM golang:${GOLANG_VERSION} as builder +FROM golang:bullseye AS builder WORKDIR /workspace -COPY nptest/nptest.go nptest.go -COPY go.sum go.sum -COPY go.mod go.mod +COPY . . RUN go build -o nptests FROM debian:bullseye ENV LD_LIBRARY_PATH=/usr/local/lib -MAINTAINER Girish Kalele +LABEL org.opencontainers.image.description "Network performance tests in k8s engine" + # install binary and remove cache RUN apt-get update \ && apt-get install -y curl wget net-tools gcc make libsctp-dev git autotools-dev automake \ @@ -56,6 +54,12 @@ RUN cd iperf-3.1 && ./configure --prefix=/usr/local --bindir /usr/local/bin && m RUN curl -LO https://github.com/HewlettPackard/netperf/archive/netperf-2.7.0.tar.gz && tar -xzf netperf-2.7.0.tar.gz && mv netperf-netperf-2.7.0/ netperf-2.7.0 RUN cd netperf-2.7.0 && ./configure --prefix=/usr/local --bindir /usr/local/bin && make CFLAGS=-fcommon && make install +# Validate the installation of qperf, iperf3 and netperf +RUN usr/local/bin/qperf --help +RUN usr/local/bin/iperf3 -v +RUN usr/local/bin/netperf -V +RUN usr/local/bin/netserver -V + COPY --from=builder /workspace/nptests /usr/bin/ ENTRYPOINT ["nptests"] diff --git a/network/benchmarks/netperf/nptest/go.mod b/network/benchmarks/netperf/nptest/go.mod new file mode 100644 index 0000000000..eddeeef3ab --- /dev/null +++ b/network/benchmarks/netperf/nptest/go.mod @@ -0,0 +1,3 @@ +module k8s.io/perf-tests/network/nptest + +go 1.23 diff --git a/network/benchmarks/netperf/nptest/nptest.go b/network/benchmarks/netperf/nptest/nptest.go index 47b8f2cd67..30bba0df59 100644 --- a/network/benchmarks/netperf/nptest/nptest.go +++ b/network/benchmarks/netperf/nptest/nptest.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2025 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ package main // Imports only base Golang packages import ( "bytes" + "encoding/json" "flag" "fmt" "log" @@ -34,9 +35,7 @@ import ( "net/rpc" "os" "os/exec" - "regexp" "strconv" - "strings" "sync" "time" ) @@ -57,15 +56,24 @@ var testFrom, testTo int var workerStateMap map[string]*workerState -var iperfTCPOutputRegexp *regexp.Regexp -var iperfSCTPOutputRegexp *regexp.Regexp -var iperfUDPOutputRegexp *regexp.Regexp -var netperfOutputRegexp *regexp.Regexp -var iperfCPUOutputRegexp *regexp.Regexp - var dataPoints map[string][]point var dataPointKeys []string var datapointsFlushed bool +var activeTests []*TestCase + +type Result struct { + Label string `json:"label"` + Result json.RawMessage `json:"result"` +} + +var results []Result + +func addResult(label, resultJSON string) { + results = append(results, Result{ + Label: label, + Result: json.RawMessage(resultJSON), + }) +} var globalLock sync.Mutex @@ -77,6 +85,8 @@ const ( netperfPath = "/usr/local/bin/netperf" netperfServerPath = "/usr/local/bin/netserver" outputCaptureFile = "/tmp/output.txt" + jsonDataMarker = "GENERATING JSON OUTPUT" + jsonEndDataMarker = "END JSON OUTPUT" mssMin = 96 mssMax = 1460 mssStepSize = 64 @@ -84,17 +94,10 @@ const ( msgSizeMin = 1 parallelStreams = "8" rpcServicePort = "5202" + iperf3SctpPort = "5004" localhostIPv4Address = "127.0.0.1" ) -const ( - iperfTCPTest = iota - qperfTCPTest - iperfUDPTest - iperfSctpTest - netperfTest -) - // NetPerfRPC service that exposes RegisterClient and ReceiveOutput for clients type NetPerfRPC int @@ -107,27 +110,26 @@ type ClientRegistrationData struct { } // IperfClientWorkItem represents a single task for an Iperf client -type IperfClientWorkItem struct { - Host string - Port string - MSS int - MsgSize int - Type int +type ClientWorkItem struct { + Host string + Port string + Params TestParams } // IperfServerWorkItem represents a single task for an Iperf server -type IperfServerWorkItem struct { +type ServerWorkItem struct { ListenPort string Timeout int } // WorkItem represents a single task for a worker type WorkItem struct { - IsClientItem bool - IsServerItem bool - IsIdle bool - ClientItem IperfClientWorkItem - ServerItem IperfServerWorkItem + IsClientItem bool + IsServerItem bool + IsIdle bool + TestCaseIndex int + ClientItem ClientWorkItem + ServerItem ServerWorkItem } type workerState struct { @@ -139,70 +141,21 @@ type workerState struct { // WorkerOutput stores the results from a single worker type WorkerOutput struct { - Output string - Code int - Worker string - Type int -} - -type testcase struct { - SourceNode string - DestinationNode string - Label string - ClusterIP bool - Finished bool - MSS int - MsgSize int - Type int + TestCaseIndex int + Output string + Code int + Worker string + Type TestType } -var testcases []*testcase -var currentJobIndex int - func init() { flag.StringVar(&mode, "mode", "worker", "Mode for the daemon (worker | orchestrator)") flag.StringVar(&port, "port", rpcServicePort, "Port to listen on (defaults to 5202)") - flag.StringVar(&host, "host", "", "IP address to bind to (defaults to 0.0.0.0)") flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom") flag.IntVar(&testTo, "testTo", 5, "end at test number testTo") workerStateMap = make(map[string]*workerState) - testcases = []*testcase{ - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 qperf TCP. Same VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 qperf TCP. Same VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 qperf TCP. Remote VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 qperf TCP. Remote VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 qperf TCP. Hairpin Pod to own Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin}, - - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 iperf TCP. Same VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 iperf TCP. Same VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 iperf TCP. Remote VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 iperf TCP. Remote VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 iperf TCP. Hairpin Pod to own Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, - - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "6 iperf SCTP. Same VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "7 iperf SCTP. Same VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "8 iperf SCTP. Remote VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "9 iperf SCTP. Remote VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "10 iperf SCTP. Hairpin Pod to own Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "11 iperf UDP. Same VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "12 iperf UDP. Same VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "13 iperf UDP. Remote VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "14 iperf UDP. Remote VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "15 netperf. Same VM using Pod IP", Type: netperfTest, ClusterIP: false}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "16 netperf. Same VM using Virtual IP", Type: netperfTest, ClusterIP: true}, - {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "17 netperf. Remote VM using Pod IP", Type: netperfTest, ClusterIP: false}, - {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "18 netperf. Remote VM using Virtual IP", Type: netperfTest, ClusterIP: true}, - } - - currentJobIndex = 0 - - // Regexes to parse the Mbits/sec out of iperf TCP, SCTP, UDP and netperf output - iperfTCPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") - iperfSCTPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") - iperfUDPOutputRegexp = regexp.MustCompile("\\s+(\\S+)\\sMbits/sec\\s+\\S+\\s+ms\\s+") - netperfOutputRegexp = regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+") - iperfCPUOutputRegexp = regexp.MustCompile(`local/sender\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\),\sremote/receiver\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\)`) + results = make([]Result, 0) dataPoints = make(map[string][]point) } @@ -222,10 +175,10 @@ func main() { if !validateParams() { fmt.Println("Failed to parse cmdline args - fatal error - bailing out") os.Exit(1) - } grabEnv() - testcases = testcases[testFrom:testTo] + activeTests = make([]*TestCase, 0, testTo-testFrom) + activeTests = append(activeTests, testcases[testFrom:testTo]...) fmt.Println("Running as", mode, "...") if mode == orchestratorMode { orchestrate() @@ -241,8 +194,7 @@ func grabEnv() { podname = os.Getenv("HOSTNAME") } -func validateParams() (rv bool) { - rv = true +func validateParams() bool { if mode != workerMode && mode != orchestratorMode { fmt.Println("Invalid mode", mode) return false @@ -253,14 +205,10 @@ func validateParams() (rv bool) { return false } - if (len(host)) == 0 { - if mode == orchestratorMode { - host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") - } else { - host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") - } + if len(host) == 0 { + host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") } - return + return true } func allWorkersIdle() bool { @@ -272,104 +220,6 @@ func allWorkersIdle() bool { return true } -func getWorkerPodIP(worker string) string { - return workerStateMap[worker].IP -} - -func allocateWorkToClient(workerState *workerState, workItem *WorkItem) { - if !allWorkersIdle() { - workItem.IsIdle = true - return - } - - // System is all idle - pick up next work item to allocate to client - for n, v := range testcases { - if v.Finished { - continue - } - if v.SourceNode != workerState.worker { - workItem.IsIdle = true - return - } - if _, ok := workerStateMap[v.DestinationNode]; !ok { - workItem.IsIdle = true - return - } - fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize) - workItem.ClientItem.Type = v.Type - workItem.IsClientItem = true - workerState.idle = false - currentJobIndex = n - - if !v.ClusterIP { - workItem.ClientItem.Host = getWorkerPodIP(v.DestinationNode) - } else { - workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST") - } - - switch { - case v.Type == iperfTCPTest || v.Type == iperfUDPTest || v.Type == iperfSctpTest: - workItem.ClientItem.Port = "5201" - workItem.ClientItem.MSS = v.MSS - - v.MSS = v.MSS + mssStepSize - if v.MSS > mssMax { - v.Finished = true - } - return - case v.Type == qperfTCPTest: - workItem.ClientItem.MsgSize = v.MsgSize - v.MsgSize <<= 1 - if v.MsgSize > msgSizeMax { - v.Finished = true - } - return - case v.Type == netperfTest: - workItem.ClientItem.Port = "12865" - return - } - } - - for _, v := range testcases { - if !v.Finished { - return - } - } - - if !datapointsFlushed { - fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT") - flushDataPointsToCsv() - datapointsFlushed = true - } - - workItem.IsIdle = true -} - -// RegisterClient registers a single and assign a work item to it -func (t *NetPerfRPC) RegisterClient(data *ClientRegistrationData, workItem *WorkItem) error { - globalLock.Lock() - defer globalLock.Unlock() - - state, ok := workerStateMap[data.Worker] - - if !ok { - // For new clients, trigger an iperf server start immediately - state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker} - workerStateMap[data.Worker] = state - workItem.IsServerItem = true - workItem.ServerItem.ListenPort = "5201" - workItem.ServerItem.Timeout = 3600 - return nil - } - - // Worker defaults to idle unless the allocateWork routine below assigns an item - state.idle = true - - // Give the worker a new work item or let it idle loop another 5 seconds - allocateWorkToClient(state, workItem) - return nil -} - func writeOutputFile(filename, data string) { fd, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0666) if err != nil { @@ -427,138 +277,16 @@ func flushDataPointsToCsv() { fmt.Println("END CSV DATA") } -func parseIperfTCPBandwidth(output string) string { - // Parses the output of iperf3 and grabs the group Mbits/sec from the output - match := iperfTCPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseQperfTCPLatency(output string) string { - squeeze := func(s string) string { - return strings.Join(strings.Fields(s), " ") - } - - var bw, lat string - lines := strings.Split(output, "\n") - for i, line := range lines { - line = strings.TrimSpace(line) - if line == "tcp_bw:" { - bw = squeeze(lines[i+1]) - } else if line == "tcp_lat:" { - lat = squeeze(lines[i+1]) - } - } - - return fmt.Sprintf("(%s; %s)", bw, lat) -} - -func parseIperfSctpBandwidth(output string) string { - // Parses the output of iperf3 and grabs the group Mbits/sec from the output - match := iperfSCTPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseIperfUDPBandwidth(output string) string { - // Parses the output of iperf3 (UDP mode) and grabs the Mbits/sec from the output - match := iperfUDPOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -func parseIperfCPUUsage(output string) (string, string) { - // Parses the output of iperf and grabs the CPU usage on sender and receiver side from the output - match := iperfCPUOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1], match[4] - } - return "0", "0" -} - -func parseNetperfBandwidth(output string) string { - // Parses the output of netperf and grabs the Bbits/sec from the output - match := netperfOutputRegexp.FindStringSubmatch(output) - if match != nil && len(match) > 1 { - return match[1] - } - return "0" -} - -// ReceiveOutput processes a data received from a single client -func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, _ *int) error { - globalLock.Lock() - defer globalLock.Unlock() - - testcase := testcases[currentJobIndex] - - var outputLog string - var bw string - var cpuSender string - var cpuReceiver string - - switch data.Type { - case iperfTCPTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfTCPBandwidth(data.Output) - cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case qperfTCPTest: - msgSize := testcases[currentJobIndex].MsgSize / 2 - outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MsgSize:", msgSize) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseQperfTCPLatency(data.Output) - cpuSender, cpuReceiver = "na", "na" - registerDataPoint(testcase.Label, msgSize, bw, currentJobIndex) - - case iperfSctpTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received SCTP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfSctpBandwidth(data.Output) - cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case iperfUDPTest: - mss := testcases[currentJobIndex].MSS - mssStepSize - outputLog = outputLog + fmt.Sprintln("Received UDP output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseIperfUDPBandwidth(data.Output) - registerDataPoint(testcase.Label, mss, bw, currentJobIndex) - - case netperfTest: - outputLog = outputLog + fmt.Sprintln("Received netperf output from worker", data.Worker, "for test", testcase.Label, - "from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output - writeOutputFile(outputCaptureFile, outputLog) - bw = parseNetperfBandwidth(data.Output) - registerDataPoint(testcase.Label, 0, bw, currentJobIndex) - testcases[currentJobIndex].Finished = true - - } - - switch data.Type { - case iperfTCPTest, iperfSctpTest: - fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec. CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.") - case qperfTCPTest: - fmt.Println("Jobdone from worker QPERF", data.Worker, "Bandwidth, Latency was", bw, "CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.") - default: - fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec") +func flushResultJSONData() { + jsonData, err := json.MarshalIndent(results, "", " ") + if err != nil { + fmt.Println("Error generating JSON:", err) + return } - return nil + fmt.Println(jsonDataMarker) + fmt.Println(string(jsonData)) + fmt.Println(jsonEndDataMarker) } func serveRPCRequests(port string) { @@ -610,31 +338,13 @@ func getMyIP() string { } func handleClientWorkItem(client *rpc.Client, workItem *WorkItem) { - fmt.Println("Orchestrator requests worker run item Type:", workItem.ClientItem.Type) - switch { - case workItem.ClientItem.Type == iperfTCPTest || workItem.ClientItem.Type == iperfUDPTest || workItem.ClientItem.Type == iperfSctpTest: - outputString := iperfClient(workItem.ClientItem.Host, workItem.ClientItem.MSS, workItem.ClientItem.Type) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } - case workItem.ClientItem.Type == qperfTCPTest: - outputString := qperfClient(workItem.ClientItem.Host, workItem.ClientItem.Type, workItem.ClientItem.MsgSize) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } - case workItem.ClientItem.Type == netperfTest: - outputString := netperfClient(workItem.ClientItem.Host) - var reply int - err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) - if err != nil { - log.Fatal("failed to call client", err) - } + testCase := activeTests[workItem.TestCaseIndex] + outputString := testCase.TestRunner(workItem.ClientItem) + var reply int + err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: testCase.Type, TestCaseIndex: workItem.TestCaseIndex}, &reply) + if err != nil { + log.Fatal("failed to call client", err) } - // Client COOLDOWN period before asking for next work item to replenish burst allowance policers etc time.Sleep(10 * time.Second) } @@ -646,29 +356,29 @@ func isIPv6(address string) bool { // startWork : Entry point to the worker infinite loop func startWork() { - for true { - var timeout time.Duration + for { var client *rpc.Client var err error + // Address recieved via command line address := host if isIPv6(address) { address = "[" + address + "]" } - timeout = 5 - for true { + for { fmt.Println("Attempting to connect to orchestrator at", host) client, err = rpc.DialHTTP("tcp", address+":"+port) if err == nil { break } fmt.Println("RPC connection to ", host, " failed:", err) - time.Sleep(timeout * time.Second) + time.Sleep(5 * time.Second) } - for true { + for { clientData := ClientRegistrationData{Host: podname, KubeNode: kubenode, Worker: worker, IP: getMyIP()} + var workItem WorkItem if err := client.Call("NetPerfRPC.RegisterClient", clientData, &workItem); err != nil { @@ -678,18 +388,18 @@ func startWork() { } switch { - case workItem.IsIdle == true: + case workItem.IsIdle: time.Sleep(5 * time.Second) continue - case workItem.IsServerItem == true: + case workItem.IsServerItem: fmt.Println("Orchestrator requests worker run iperf and netperf servers") go iperfServer() go qperfServer() go netperfServer() time.Sleep(1 * time.Second) - case workItem.IsClientItem == true: + case workItem.IsClientItem: handleClientWorkItem(client, &workItem) } } @@ -698,10 +408,8 @@ func startWork() { // Invoke and indefinitely run an iperf server func iperfServer() { - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60"}, 15) - if success { - fmt.Println(output) - } + output, _ := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60", "-D"}, 15) + fmt.Println(output) } // Invoke and indefinitely run an qperf server @@ -720,77 +428,136 @@ func netperfServer() { } } -// Invoke and run an iperf client and return the output if successful. -func iperfClient(serverHost string, mss int, workItemType int) (rv string) { - switch { - case workItemType == iperfTCPTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss)}, 15) - if success { - rv = output - } - - case workItemType == iperfSctpTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss), "--sctp"}, 15) - if success { - rv = output - } +func cmdExec(command string, args []string, _ int32) (rv string, rc bool) { + cmd := exec.Cmd{Path: command, Args: args} - case workItemType == iperfUDPTest: - output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-i", "30", "-t", "10", "-f", "m", "-b", "0", "-u"}, 15) - if success { - rv = output - } + var stdoutput bytes.Buffer + var stderror bytes.Buffer + cmd.Stdout = &stdoutput + cmd.Stderr = &stderror + if err := cmd.Run(); err != nil { + outputstr := stdoutput.String() + errstr := stderror.String() + fmt.Println("Failed to run", outputstr, "error:", errstr, err) + return } + + rv = stdoutput.String() + rc = true return } -// Invoke and run an qperf client and return the output if successful. -func qperfClient(serverHost string, workItemType, msgSize int) (rv string) { +func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, _ *int) error { + globalLock.Lock() + defer globalLock.Unlock() - str := fmt.Sprint + fmt.Println("ReceiveOutput WorkItem TestCaseIndex: ", data.TestCaseIndex) + testcase := activeTests[data.TestCaseIndex] - switch { - case workItemType == qperfTCPTest: - output, success := cmdExec(qperfPath, []string{ - qperfPath, "-ip", "19766", "-m", str(msgSize), serverHost, "tcp_bw", "tcp_lat", - }, 15) - if success { - rv = output - } - default: - fmt.Println("unknown work item type: ", workItemType) + outputLog := fmt.Sprintln("Received output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + + if testcase.BandwidthParser != nil { + bw, mss := testcase.BandwidthParser(data.Output) + registerDataPoint(testcase.Label, mss, fmt.Sprintf("%f", bw), data.TestCaseIndex) + fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec") } - return + + if testcase.JSONParser != nil { + addResult( + fmt.Sprintf("%s with MSS: %d", testcase.Label, testcase.MSS-mssStepSize), + testcase.JSONParser(data.Output), + ) + fmt.Println("Jobdone from worker", data.Worker, "JSON output generated") + } + + return nil } -// Invoke and run a netperf client and return the output if successful. -func netperfClient(serverHost string) (rv string) { - output, success := cmdExec(netperfPath, []string{netperfPath, "-H", serverHost}, 15) - if success { - fmt.Println(output) - rv = output - } else { - fmt.Println("Error running netperf client", output) +func (t *NetPerfRPC) RegisterClient(data ClientRegistrationData, workItem *WorkItem) error { + globalLock.Lock() + defer globalLock.Unlock() + + state, ok := workerStateMap[data.Worker] + + if !ok { + // For new clients, trigger an iperf server start immediately + state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker} + workerStateMap[data.Worker] = state + workItem.IsServerItem = true + workItem.ServerItem.ListenPort = "5201" + workItem.ServerItem.Timeout = 3600 + return nil } - return + // Worker defaults to idle unless the allocateWork routine below assigns an item + state.idle = true + + // Give the worker a new work item or let it idle loop another 5 seconds + allocateWorkToClient(state, workItem) + return nil } -func cmdExec(command string, args []string, _ int32) (rv string, rc bool) { - cmd := exec.Cmd{Path: command, Args: args} +func allocateWorkToClient(workerState *workerState, workItem *WorkItem) { + if !allWorkersIdle() { + workItem.IsIdle = true + return + } + + // System is all idle - pick up next work item to allocate to client + for n, v := range activeTests { + if v.Finished { + continue + } + if v.SourceNode != workerState.worker { + workItem.IsIdle = true + return + } + if _, ok := workerStateMap[v.DestinationNode]; !ok { + workItem.IsIdle = true + return + } + fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize) + workItem.IsClientItem = true + workItem.TestCaseIndex = n + workerState.idle = false + + if !v.ClusterIP { + workItem.ClientItem.Host = workerStateMap[workerState.worker].IP + } else { + workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST") + } + + workItem.ClientItem.Params = v.TestParams + + if v.MSS != 0 && v.MSS < mssMax { + v.MSS += mssStepSize + } else { + v.Finished = true + } + + if v.Type == netperfTest { + workItem.ClientItem.Port = "12865" + } else { + workItem.ClientItem.Port = "5201" + } - var stdoutput bytes.Buffer - var stderror bytes.Buffer - cmd.Stdout = &stdoutput - cmd.Stderr = &stderror - if err := cmd.Run(); err != nil { - outputstr := stdoutput.String() - errstr := stderror.String() - fmt.Println("Failed to run", outputstr, "error:", errstr, err) return } - rv = stdoutput.String() - rc = true - return + for _, v := range activeTests { + if !v.Finished { + return + } + } + + if !datapointsFlushed { + fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT") + flushDataPointsToCsv() + flushResultJSONData() + datapointsFlushed = true + } + + workItem.IsIdle = true } diff --git a/network/benchmarks/netperf/nptest/nptest_test.go b/network/benchmarks/netperf/nptest/nptest_test.go deleted file mode 100644 index c651feb814..0000000000 --- a/network/benchmarks/netperf/nptest/nptest_test.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import "testing" - -func TestParseQperfTCPLatency(t *testing.T) { - input := ` -tcp_bw: - bw = 5.07 GB/sec -tcp_lat: - latency = 15.6 us -` - - expected := "(bw = 5.07 GB/sec; latency = 15.6 us)" - output := parseQperfTCPLatency(input) - - if output != expected { - t.Fatalf("Expected: %s, Got: %s", expected, output) - } - -} diff --git a/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go b/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go new file mode 100644 index 0000000000..266f2d7b70 --- /dev/null +++ b/network/benchmarks/netperf/nptest/parsers/bandwidth_parsers.go @@ -0,0 +1,59 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package parsers + +import ( + "encoding/json" + "regexp" + "strconv" +) + +func ParseIperfTCPBandwidth(output string) (bw float64, mss int) { + var iperfTCPoutput IperfTCPCommandOutput + + err := json.Unmarshal([]byte(output), &iperfTCPoutput) + if err != nil { + return 0, 0 + } + + bw = iperfTCPoutput.End.SumSent.BitsPerSecond / 1e6 + mss = iperfTCPoutput.Start.TCPMss + + return bw, mss +} + +func ParseIperfUDPBandwidth(output string) (bw float64, mss int) { + var iperfUDPOutput IperfUDPCommandOutput + + err := json.Unmarshal([]byte(output), &iperfUDPOutput) + if err != nil { + return 0, 0 + } + + return iperfUDPOutput.End.Sum.BitsPerSecond / 1e6, 0 +} + +func ParseNetperfBandwidth(output string) (bw float64, mss int) { + // Parses the output of netperf and grabs the Bbits/sec from the output + netperfOutputRegexp := regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+") + match := netperfOutputRegexp.FindStringSubmatch(output) + if len(match) > 1 { + floatVal, _ := strconv.ParseFloat(match[1], 64) + return floatVal, 0 + } + return 0, 0 +} diff --git a/network/benchmarks/netperf/nptest/parsers/json_parsers.go b/network/benchmarks/netperf/nptest/parsers/json_parsers.go new file mode 100644 index 0000000000..37cc4f8b34 --- /dev/null +++ b/network/benchmarks/netperf/nptest/parsers/json_parsers.go @@ -0,0 +1,101 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package parsers + +import ( + "encoding/json" + "math/bits" +) + +func ParseIperfTCPResults(output string) string { + var iperfOutput IperfTCPCommandOutput + + err := json.Unmarshal([]byte(output), &iperfOutput) + if err != nil { + return "{\"error\": \"Failed to parse JSON output\", \"message\": \"" + err.Error() + "\"}" + } + + // Calculate the min, max and mean rtts by aggregating the streams + var sumMeanRtt uint + var minRtt uint = 1<