diff --git a/test/cases/nvidia/main_test.go b/test/cases/nvidia/main_test.go index cab0fd1d4..1fd50cc76 100644 --- a/test/cases/nvidia/main_test.go +++ b/test/cases/nvidia/main_test.go @@ -5,7 +5,6 @@ package nvidia import ( "context" _ "embed" - "flag" "fmt" "log" "os" @@ -14,8 +13,8 @@ import ( "testing" fwext "github.com/aws/aws-k8s-tester/internal/e2e" + "github.com/aws/aws-k8s-tester/test/common" "github.com/aws/aws-k8s-tester/test/manifests" - "github.com/aws/aws-sdk-go-v2/aws" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,17 +25,22 @@ import ( "sigs.k8s.io/e2e-framework/pkg/envconf" ) +type Config struct { + common.MetricOps + NodeType string `flag:"nodeType" desc:"node type for the tests"` + InstallDevicePlugin bool `flag:"installDevicePlugin" desc:"install nvidia device plugin"` + EfaEnabled bool `flag:"efaEnabled" desc:"enable efa tests"` + NvidiaTestImage string `flag:"nvidiaTestImage" desc:"nccl test image for nccl tests"` + PytorchImage string `flag:"pytorchImage" desc:"pytorch cuda image for single node tests"` + SkipUnitTestSubcommand string `flag:"skipUnitTestSubcommand" desc:"optional command to skip specified unit test"` +} + var ( - testenv env.Environment - nodeType *string - installDevicePlugin *bool - efaEnabled *bool - nvidiaTestImage *string - pytorchImage *string - skipUnitTestSubcommand *string - nodeCount int - gpuPerNode int - efaPerNode int + testenv env.Environment + testConfig Config + nodeCount int + gpuPerNode int + efaPerNode int ) func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Context, error) { @@ -51,31 +55,6 @@ func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Con return ctx, nil } -func deployNvidiaDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) { - ds := appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"}, - } - err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds), - wait.WithContext(ctx)) - if err != nil { - return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err) - } - return ctx, nil -} - -func deployEFAPlugin(ctx context.Context, config *envconf.Config) (context.Context, error) { - ds := appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"}, - } - err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds), - wait.WithContext(ctx)) - if err != nil { - return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err) - } - - return ctx, nil -} - func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Context, error) { clientset, err := kubernetes.NewForConfig(config.Client().RESTConfig()) if err != nil { @@ -93,9 +72,9 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex } } - if *nodeType != "" { + if testConfig.NodeType != "" { for _, v := range nodes.Items { - if v.Labels["node.kubernetes.io/instance-type"] == *nodeType { + if v.Labels["node.kubernetes.io/instance-type"] == testConfig.NodeType { nodeCount++ gpu := v.Status.Capacity["nvidia.com/gpu"] gpuPerNode = int(gpu.Value()) @@ -105,7 +84,7 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex } } else { log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"]) - nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"]) + testConfig.NodeType = nodes.Items[0].Labels["node.kubernetes.io/instance-type"] nodeCount = len(nodes.Items) gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"] gpuPerNode = int(gpu.Value()) @@ -117,28 +96,31 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex } func TestMain(m *testing.M) { - nodeType = flag.String("nodeType", "", "node type for the tests") - nvidiaTestImage = flag.String("nvidiaTestImage", "", "nccl test image for nccl tests") - pytorchImage = flag.String("pytorchImage", "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2", "pytorch cuda image for single node tests") - efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests") - installDevicePlugin = flag.Bool("installDevicePlugin", true, "install nvidia device plugin") - skipUnitTestSubcommand = flag.String("skipUnitTestSubcommand", "", "optional command to skip specified unit test, `-s test1|test2|...`") + testConfig = Config{ + InstallDevicePlugin: true, + PytorchImage: "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2", + } + + _, err := common.ParseFlags(&testConfig) + if err != nil { + log.Fatalf("failed to parse flags: %v", err) + } cfg, err := envconf.NewFromFlags() if err != nil { log.Fatalf("failed to initialize test environment: %v", err) } - testenv = env.NewWithConfig(cfg) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - testenv = testenv.WithContext(ctx) + testenv = env.NewWithConfig(cfg).WithContext(ctx) - // all NVIDIA tests require the device plugin and MPI operator - deploymentManifests := [][]byte{ + manifestsList := [][]byte{ manifests.MpiOperatorManifest, } + setUpFunctions := []env.Func{ func(ctx context.Context, config *envconf.Config) (context.Context, error) { - err := fwext.ApplyManifests(config.Client().RESTConfig(), deploymentManifests...) + err := fwext.ApplyManifests(config.Client().RESTConfig(), manifestsList...) if err != nil { return ctx, err } @@ -147,14 +129,35 @@ func TestMain(m *testing.M) { deployMPIOperator, } - if *installDevicePlugin { - deploymentManifests = append(deploymentManifests, manifests.NvidiaDevicePluginManifest) - setUpFunctions = append(setUpFunctions, deployNvidiaDevicePlugin) + if testConfig.InstallDevicePlugin { + manifestsList = append(manifestsList, manifests.NvidiaDevicePluginManifest) + setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) { + return common.DeployDaemonSet("nvidia-device-plugin-daemonset", "kube-system")(ctx, config) + }) } - if *efaEnabled { - deploymentManifests = append(deploymentManifests, manifests.EfaDevicePluginManifest) - setUpFunctions = append(setUpFunctions, deployEFAPlugin) + if testConfig.EfaEnabled { + manifestsList = append(manifestsList, manifests.EfaDevicePluginManifest) + setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) { + return common.DeployDaemonSet("aws-efa-k8s-device-plugin-daemonset", "kube-system")(ctx, config) + }) + } + + if len(testConfig.MetricDimensions) > 0 { + renderedCloudWatchAgentManifest, err := manifests.RenderCloudWatchAgentManifest(testConfig.MetricDimensions) + if err != nil { + log.Printf("Warning: failed to render CloudWatch Agent manifest: %v", err) + } + manifestsList = append(manifestsList, manifests.DCGMExporterManifest, renderedCloudWatchAgentManifest) + setUpFunctions = append(setUpFunctions, func(ctx context.Context, config *envconf.Config) (context.Context, error) { + if ctx, err := common.DeployDaemonSet("dcgm-exporter", "kube-system")(ctx, config); err != nil { + return ctx, err + } + if ctx, err := common.DeployDaemonSet("cwagent", "amazon-cloudwatch")(ctx, config); err != nil { + return ctx, err + } + return ctx, nil + }) } setUpFunctions = append(setUpFunctions, checkNodeTypes) @@ -162,12 +165,8 @@ func TestMain(m *testing.M) { testenv.Finish( func(ctx context.Context, config *envconf.Config) (context.Context, error) { - err := fwext.DeleteManifests(cfg.Client().RESTConfig(), manifests.EfaDevicePluginManifest) - if err != nil { - return ctx, err - } - slices.Reverse(deploymentManifests) - err = fwext.DeleteManifests(config.Client().RESTConfig(), deploymentManifests...) + slices.Reverse(manifestsList) + err := fwext.DeleteManifests(config.Client().RESTConfig(), manifestsList...) if err != nil { return ctx, err } diff --git a/test/cases/nvidia/mpi_test.go b/test/cases/nvidia/mpi_test.go index e24a6f324..b29cf0675 100644 --- a/test/cases/nvidia/mpi_test.go +++ b/test/cases/nvidia/mpi_test.go @@ -63,12 +63,12 @@ func multiNode(testName string) features.Feature { WithLabel("hardware", "gpu"). WithLabel("hardware", "efa"). Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { - if *nvidiaTestImage == "" { + if testConfig.NvidiaTestImage == "" { t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url")) } maxBytes := "2G" ncclBuffSize := "4194304" - if slices.Contains(instanceSupportsRdmaRead, *nodeType) { + if slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) { t.Log("Instance supports RDMA") maxBytes = "16G" ncclBuffSize = "8388608" @@ -79,7 +79,7 @@ func multiNode(testName string) features.Feature { WorkerNodeCount: nodeCount, WorkerNodeGpuCount: nodeCount * gpuPerNode, GpuPerNode: gpuPerNode, - NvidiaTestImage: *nvidiaTestImage, + NvidiaTestImage: testConfig.NvidiaTestImage, EfaInterfacePerNode: efaPerNode, MaxBytes: maxBytes, NcclBuffSize: ncclBuffSize, @@ -118,10 +118,10 @@ func multiNode(testName string) features.Feature { if !t.Failed() { t.Log("Multi node job completed") // Verify GPU Direct RDMA is used on P4/P5 - if *efaEnabled && slices.Contains(instanceSupportsRdmaRead, *nodeType) { + if testConfig.EfaEnabled && slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) { pattern := regexp.MustCompile(`\[send\] via NET/.*Libfabric/.*/GDRDMA`) if !pattern.MatchString(log) { - t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", *nodeType) + t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", testConfig.NodeType) } } } @@ -149,7 +149,7 @@ func singleNode() features.Feature { renderedSingleNodeManifest, err = fwext.RenderManifests(mpiJobPytorchTrainingSingleNodeManifest, struct { PytorchTestImage string }{ - PytorchTestImage: *pytorchImage, + PytorchTestImage: testConfig.PytorchImage, }) if err != nil { t.Fatal(err) diff --git a/test/cases/nvidia/unit_test.go b/test/cases/nvidia/unit_test.go index 27918e878..4e8fb2e2c 100644 --- a/test/cases/nvidia/unit_test.go +++ b/test/cases/nvidia/unit_test.go @@ -42,15 +42,15 @@ func TestSingleNodeUnitTest(t *testing.T) { WithLabel("suite", "nvidia"). WithLabel("hardware", "gpu"). Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { - if *nvidiaTestImage == "" { + if testConfig.NvidiaTestImage == "" { t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url")) } var err error renderedJobUnitTestSingleNodeManifest, err = fwext.RenderManifests(jobUnitTestSingleNodeManifest, unitTestManifestTplVars{ - NvidiaTestImage: *nvidiaTestImage, - SkipTestSubcommand: *skipUnitTestSubcommand, + NvidiaTestImage: testConfig.NvidiaTestImage, + SkipTestSubcommand: testConfig.SkipUnitTestSubcommand, GpuPerNode: gpuPerNode, - NodeType: *nodeType, + NodeType: testConfig.NodeType, }) if err != nil { t.Fatal(err)