Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 80 additions & 70 deletions test/cases/nvidia/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package nvidia
import (
"context"
_ "embed"
"flag"
"fmt"
"log"
"os"
Expand All @@ -14,8 +13,8 @@ import (
"testing"

fwext "github.com/aws/aws-k8s-tester/internal/e2e"
"github.com/aws/aws-k8s-tester/test/common"
"github.com/aws/aws-k8s-tester/test/manifests"
"github.com/aws/aws-sdk-go-v2/aws"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,17 +25,22 @@ import (
"sigs.k8s.io/e2e-framework/pkg/envconf"
)

type Config struct {
common.MetricOps
NodeType string `flag:"nodeType" desc:"node type for the tests"`
InstallDevicePlugin bool `flag:"installDevicePlugin" desc:"install nvidia device plugin"`
EfaEnabled bool `flag:"efaEnabled" desc:"enable efa tests"`
NvidiaTestImage string `flag:"nvidiaTestImage" desc:"nccl test image for nccl tests"`
PytorchImage string `flag:"pytorchImage" desc:"pytorch cuda image for single node tests"`
SkipUnitTestSubcommand string `flag:"skipUnitTestSubcommand" desc:"optional command to skip specified unit test"`
}

var (
testenv env.Environment
nodeType *string
installDevicePlugin *bool
efaEnabled *bool
nvidiaTestImage *string
pytorchImage *string
skipUnitTestSubcommand *string
nodeCount int
gpuPerNode int
efaPerNode int
testenv env.Environment
testConfig Config
nodeCount int
gpuPerNode int
efaPerNode int
)

func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand All @@ -51,31 +55,6 @@ func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Con
return ctx, nil
}

func deployNvidiaDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err)
}
return ctx, nil
}

func deployEFAPlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err)
}

return ctx, nil
}

func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(config.Client().RESTConfig())
if err != nil {
Expand All @@ -93,9 +72,9 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}
}

if *nodeType != "" {
if testConfig.NodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
if v.Labels["node.kubernetes.io/instance-type"] == testConfig.NodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
Expand All @@ -105,7 +84,7 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
testConfig.NodeType = nodes.Items[0].Labels["node.kubernetes.io/instance-type"]
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
Expand All @@ -117,57 +96,88 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
}

func TestMain(m *testing.M) {
nodeType = flag.String("nodeType", "", "node type for the tests")
nvidiaTestImage = flag.String("nvidiaTestImage", "", "nccl test image for nccl tests")
pytorchImage = flag.String("pytorchImage", "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2", "pytorch cuda image for single node tests")
efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install nvidia device plugin")
skipUnitTestSubcommand = flag.String("skipUnitTestSubcommand", "", "optional command to skip specified unit test, `-s test1|test2|...`")
_, err := common.ParseFlags(&testConfig)
if err != nil {
log.Fatalf("failed to parse flags: %v", err)
}
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
}
testenv = env.NewWithConfig(cfg)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
testenv = testenv.WithContext(ctx)
testenv = env.NewWithConfig(cfg).WithContext(ctx)

// Set default values
if testConfig.PytorchImage == "" {
testConfig.PytorchImage = "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.1.0-gpu-py310-cu121-ubuntu20.04-ec2"
}
if !testConfig.InstallDevicePlugin {
testConfig.InstallDevicePlugin = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this move down here? The default flag setting should same as here happened before common.ParseFlags(&testConfig)


// all NVIDIA tests require the device plugin and MPI operator
deploymentManifests := [][]byte{
renderedCloudWatchAgentManifest, err := manifests.RenderCloudWatchAgentManifest(testConfig.MetricDimensions)
if err != nil {
log.Printf("Warning: failed to render CloudWatch Agent manifest: %v", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same render manifest issue. put it under the if len(testConfig.MetricDimensions) > 0 {}


manifestsList := [][]byte{
manifests.MpiOperatorManifest,
}
setUpFunctions := []env.Func{
if testConfig.InstallDevicePlugin {
manifestsList = append(manifestsList, manifests.NvidiaDevicePluginManifest)
}
if testConfig.EfaEnabled {
manifestsList = append(manifestsList, manifests.EfaDevicePluginManifest)
}
if len(testConfig.MetricDimensions) > 0 {
manifestsList = append(manifestsList, manifests.DCGMExporterManifest, renderedCloudWatchAgentManifest)
}

testenv.Setup(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), deploymentManifests...)
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifestsList...)
if err != nil {
return ctx, err
}
return ctx, nil
},
deployMPIOperator,
}

if *installDevicePlugin {
deploymentManifests = append(deploymentManifests, manifests.NvidiaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployNvidiaDevicePlugin)
}
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
if testConfig.InstallDevicePlugin {
if ctx, err := common.DeployDaemonSet("nvidia-device-plugin-daemonset", "kube-system")(ctx, config); err != nil {
return ctx, err
}
}
if testConfig.EfaEnabled {
if ctx, err := common.DeployDaemonSet("aws-efa-k8s-device-plugin-daemonset", "kube-system")(ctx, config); err != nil {
return ctx, err
}
}
return ctx, nil
}, // Deploy device plugins conditionally

if *efaEnabled {
deploymentManifests = append(deploymentManifests, manifests.EfaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployEFAPlugin)
}
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
if len(testConfig.MetricDimensions) > 0 {
if ctx, err := common.DeployDaemonSet("dcgm-exporter", "kube-system")(ctx, config); err != nil {
return ctx, err
}
if ctx, err := common.DeployDaemonSet("cwagent", "amazon-cloudwatch")(ctx, config); err != nil {
return ctx, err
}
}
return ctx, nil
}, // Deploy CloudWatch Agent + DCGM only if MetricDimensions are set

setUpFunctions = append(setUpFunctions, checkNodeTypes)
testenv.Setup(setUpFunctions...)
checkNodeTypes, // Dynamically check node types and capacities after device plugins are ready
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you keep the original implement and just append another func in setUpFunctions` to deploy the optional cw and exporter and then pass it to testenv.Setup() at once. The original implementation here is much cleaner.


testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.DeleteManifests(cfg.Client().RESTConfig(), manifests.EfaDevicePluginManifest)
if err != nil {
return ctx, err
}
slices.Reverse(deploymentManifests)
err = fwext.DeleteManifests(config.Client().RESTConfig(), deploymentManifests...)
slices.Reverse(manifestsList)
err := fwext.DeleteManifests(config.Client().RESTConfig(), manifestsList...)
if err != nil {
return ctx, err
}
Expand All @@ -176,4 +186,4 @@ func TestMain(m *testing.M) {
)

os.Exit(testenv.Run(m))
}
}
12 changes: 6 additions & 6 deletions test/cases/nvidia/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func multiNode(testName string) features.Feature {
WithLabel("hardware", "gpu").
WithLabel("hardware", "efa").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
if *nvidiaTestImage == "" {
if testConfig.NvidiaTestImage == "" {
t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url"))
}
maxBytes := "2G"
ncclBuffSize := "4194304"
if slices.Contains(instanceSupportsRdmaRead, *nodeType) {
if slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) {
t.Log("Instance supports RDMA")
maxBytes = "16G"
ncclBuffSize = "8388608"
Expand All @@ -79,7 +79,7 @@ func multiNode(testName string) features.Feature {
WorkerNodeCount: nodeCount,
WorkerNodeGpuCount: nodeCount * gpuPerNode,
GpuPerNode: gpuPerNode,
NvidiaTestImage: *nvidiaTestImage,
NvidiaTestImage: testConfig.NvidiaTestImage,
EfaInterfacePerNode: efaPerNode,
MaxBytes: maxBytes,
NcclBuffSize: ncclBuffSize,
Expand Down Expand Up @@ -118,10 +118,10 @@ func multiNode(testName string) features.Feature {
if !t.Failed() {
t.Log("Multi node job completed")
// Verify GPU Direct RDMA is used on P4/P5
if *efaEnabled && slices.Contains(instanceSupportsRdmaRead, *nodeType) {
if testConfig.EfaEnabled && slices.Contains(instanceSupportsRdmaRead, testConfig.NodeType) {
pattern := regexp.MustCompile(`\[send\] via NET/.*Libfabric/.*/GDRDMA`)
if !pattern.MatchString(log) {
t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", *nodeType)
t.Errorf("GPU Direct RDMA is not utilized for inter-node communication in NCCL tests on instances that support GDRDMA: %s", testConfig.NodeType)
}
}
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func singleNode() features.Feature {
renderedSingleNodeManifest, err = fwext.RenderManifests(mpiJobPytorchTrainingSingleNodeManifest, struct {
PytorchTestImage string
}{
PytorchTestImage: *pytorchImage,
PytorchTestImage: testConfig.PytorchImage,
})
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions test/cases/nvidia/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSingleNodeUnitTest(t *testing.T) {
WithLabel("suite", "nvidia").
WithLabel("hardware", "gpu").
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
if *nvidiaTestImage == "" {
if testConfig.NvidiaTestImage == "" {
t.Fatal(fmt.Errorf("nvidiaTestImage must be set to run unit test, use https://github.com/aws/aws-k8s-tester/blob/main/test/images/nvidia/Dockerfile to build the image and -nvidiaTestImage to set the image url"))
}
var err error
renderedJobUnitTestSingleNodeManifest, err = fwext.RenderManifests(jobUnitTestSingleNodeManifest, unitTestManifestTplVars{
NvidiaTestImage: *nvidiaTestImage,
SkipTestSubcommand: *skipUnitTestSubcommand,
NvidiaTestImage: testConfig.NvidiaTestImage,
SkipTestSubcommand: testConfig.SkipUnitTestSubcommand,
GpuPerNode: gpuPerNode,
NodeType: *nodeType,
NodeType: testConfig.NodeType,
})
if err != nil {
t.Fatal(err)
Expand Down
Loading