From e536896ebc4a971addd1c21ec1c89bb000f5f8e5 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Tue, 8 Jul 2025 20:51:23 -0700 Subject: [PATCH 01/16] add script for eks test --- eks_infrastructure/rbac.yaml | 76 +++ test/vllm_tests/__init__.py | 0 test/vllm_tests/infra/__init__.py | 0 test/vllm_tests/infra/eks_infra.py | 458 ++++++++++++++++++ .../vllm_tests/infra/test_vllm_eks_cleanup.sh | 325 +++++++++++++ test/vllm_tests/main.py | 41 ++ test/vllm_tests/test/__init__.py | 0 test/vllm_tests/test/eks_test.py | 135 ++++++ vllm/buildspec.yml | 3 + 9 files changed, 1038 insertions(+) create mode 100644 test/vllm_tests/__init__.py create mode 100644 test/vllm_tests/infra/__init__.py create mode 100644 test/vllm_tests/infra/eks_infra.py create mode 100755 test/vllm_tests/infra/test_vllm_eks_cleanup.sh create mode 100644 test/vllm_tests/main.py create mode 100644 test/vllm_tests/test/__init__.py create mode 100644 test/vllm_tests/test/eks_test.py diff --git a/eks_infrastructure/rbac.yaml b/eks_infrastructure/rbac.yaml index f0ca6ebd7b1d..24050c05e94d 100644 --- a/eks_infrastructure/rbac.yaml +++ b/eks_infrastructure/rbac.yaml @@ -181,4 +181,80 @@ subjects: roleRef: kind: ClusterRole name: eks-cluster-role + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: vllm-role + namespace: vllm +rules: +- apiGroups: + - "" + resources: + - pods + - pods/log + - services + - secrets + - persistentvolumeclaims + verbs: + - get + - list + - create + - delete +- apiGroups: + - "leaderworkerset.x-k8s.io" + resources: + - leaderworkersets + verbs: + - get + - create + - delete +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: vllm-cluster-role +rules: +- apiGroups: + - "networking.k8s.io" + resources: + - ingresses + verbs: + - get + - create + - delete +- apiGroups: + - "storage.k8s.io" + resources: + - persistentvolumes + verbs: + - get + - create +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: vllm-role-binding + namespace: vllm +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: vllm-role +subjects: +- apiGroup: rbac.authorization.k8s.io + kind: User + name: test-role +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: vllm-cluster-role-binding +subjects: +- kind: User + name: test-role + apiGroup: rbac.authorization.k8s.io +roleRef: + kind: ClusterRole + name: vllm-cluster-role apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/test/vllm_tests/__init__.py b/test/vllm_tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/test/vllm_tests/infra/__init__.py b/test/vllm_tests/infra/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py new file mode 100644 index 000000000000..7ae08675f47f --- /dev/null +++ b/test/vllm_tests/infra/eks_infra.py @@ -0,0 +1,458 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import logging +import boto3 +import uuid +from invoke import run +from test_utils import eks as eks_utils +from test_utils import ec2 as ec2_utils +from test_utils import ( + generate_ssh_keypair, + destroy_ssh_keypair, + get_dlami_id, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + +class EksInfrastructure: + def __init__(self): + self.cluster_name = "vllm-cluster" + self.region = os.getenv("AWS_REGION", "us-west-2") + self.instance_id = None + self.key_filename = None + self.ec2_client = None + self.connection = None + + def setup_infrastructure(self): + try: + self.ec2_client = boto3.client("ec2", region_name=self.region) + key_name = f"vllm-eks-test-{str(uuid.uuid4())}" + self.key_filename = generate_ssh_keypair(self.ec2_client, key_name) + + # launch EC2 instance + ami_id = get_dlami_id(self.region) + instance_type = ec2_utils.get_ec2_instance_type("c5.12xlarge", "cpu")[0] + instance_info = ec2_utils.launch_instance( + ami_id=ami_id, + instance_type=instance_type, + ec2_key_name=key_name, + region=self.region, + iam_instance_profile_name=ec2_utils.EC2_INSTANCE_ROLE_NAME, + instance_name="vLLM-EKS-Integration-Test", + ) + self.instance_id = instance_info["InstanceId"] + + # setup connection + ec2_utils.check_instance_state(self.instance_id, region=self.region) + ec2_utils.check_system_state(self.instance_id, region=self.region) + self.connection = ec2_utils.get_ec2_fabric_connection( + self.instance_id, self.key_filename, self.region + ) + + # install prerequisites + self.connection.run("pip3 install --user boto3 invoke packaging") + self.validate_required_tools() + self.setup_prerequisites() + self.create_eks_cluster() + self.validate_cluster_setup() + self.setup_fsx_lustre() + self.setup_load_balancer_controller() + + return True + except Exception as e: + logger.error(f"Infrastructure setup failed: {e}") + self.cleanup_infrastructure() + return False + + + def setup_eks_tools(self): + logger.info("Setting up EKS tools...") + # use existing setup for eksctl, kubectl, aws-iam-authenticator + eks_utils.eks_setup() + # install helm separately + self.install_helm() + + logger.info("EKS tools setup completed") + + def install_helm(self): + logger.info("Installing Helm...") + result = run("which helm", warn=True) + if result.return_code == 0: + logger.info("Helm already installed") + return + run( + "curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3" + ) + run("chmod 700 get_helm.sh") + run("sudo ./get_helm.sh") + run("rm -f get_helm.sh") + + result = run("which helm", warn=True) + if result.return_code != 0: + raise Exception("Helm installation failed - helm not found in PATH") + + logger.info("Helm installed successfully") + + def validate_required_tools(self): + """ + Validate required tools and handle installation + """ + logger.info("Validating required tools...") + required_tools = ["aws", "eksctl", "kubectl", "helm", "curl", "jq"] + missing_tools = [] + + for tool in required_tools: + result = run(f"which {tool}", warn=True) + if result.return_code != 0: + missing_tools.append((tool, tool.upper())) + logger.warning(f"{tool} not found") + else: + logger.info(f"{tool} found: {result.stdout.strip()}") + + if missing_tools: + logger.info("Installing missing tools...") + self.setup_eks_tools() + logger.info("Tools installed successfully") + else: + logger.info("All required tools are available") + + + def validate_aws_credentials(self): + """ + Validate AWS credentials and set required IAM roles for EKS + """ + logger.info("Validating AWS credentials...") + try: + sts_client = boto3.client("sts") + identity = sts_client.get_caller_identity() + logger.info(f"AWS Identity validated: {identity['Arn']}") + + if not eks_utils.get_eks_role(): + os.environ["EKS_TEST_ROLE"] = identity["Arn"] + logger.info(f"Set EKS_TEST_ROLE: {identity['Arn']}") + + return True + except Exception as e: + logger.error(f"AWS credential validation failed: {e}") + return False + + + def setup_prerequisites(self): + """ + Setup required tools and repositories + """ + logger.info("Setting up prerequisites...") + + run("pip install --quiet git-remote-codecommit") + run("git config --global --add protocol.codecommit.allow always") + run("git clone codecommit::us-west-2://aws-vllm-dlc-blog-repo aws-vllm-dlc-blog-repo") + + + def create_eks_cluster(self): + """ + Create EKS cluster and setup IAM access + """ + logger.info("Creating EKS cluster...") + + run( + f"cd aws-vllm-dlc-blog-repo && eksctl create cluster -f eks-cluster.yaml --region {self.region}" + ) + + # create a node group with EFA Support + run( + f"cd aws-vllm-dlc-blog-repo && eksctl create nodegroup -f large-model-nodegroup.yaml --region {self.region}" + ) + + eks_utils.eks_write_kubeconfig(self.cluster_name, self.region) + + # verify that nodes are ready + result = run("kubectl get nodes") + assert "Ready" in result.stdout, "EKS nodes not ready" + logger.info("EKS cluster created successfully") + + + def validate_cluster_setup(self): + """ + Validate cluster setup including NVIDIA device plugin + """ + logger.info("Validating cluster setup...") + + if not eks_utils.is_eks_cluster_active(self.cluster_name): + raise Exception(f"EKS cluster {self.cluster_name} is not active") + + # check NVIDIA device plugin pods + logger.info("Checking NVIDIA device plugin pods...") + result = run("kubectl get pods -n kube-system | grep nvidia") + + if "nvidia-device-plugin" not in result.stdout: + raise Exception("NVIDIA device plugin pods not found") + + # count running NVIDIA pods + nvidia_pods = [ + line + for line in result.stdout.split("\n") + if "nvidia-device-plugin" in line and "Running" in line + ] + logger.info(f"Found {len(nvidia_pods)} running NVIDIA device plugin pods") + + if not nvidia_pods: + raise Exception("No running NVIDIA device plugin pods found") + + # verify GPUs are available + result = run("kubectl get nodes -o json | jq '.items[].status.capacity.\"nvidia.com/gpu\"'") + gpu_counts = [ + line.strip().strip('"') + for line in result.stdout.split("\n") + if line.strip() and line.strip() != "null" + ] + + if not gpu_counts: + raise Exception("No GPUs found in cluster nodes") + + total_gpus = sum(int(count) for count in gpu_counts if count.isdigit()) + logger.info(f"Total GPUs available in cluster: {total_gpus}") + + if total_gpus == 0: + raise Exception("No GPUs available in cluster") + + logger.info("Cluster setup validation completed") + + + def setup_fsx_lustre(self): + """ + Setup FSx Lustre filesystem with complete configuration + """ + logger.info("Setting up FSx Lustre filesystem...") + + vpc_id = run( + f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.vpcId' --output text" + ).stdout.strip() + logger.info(f"Using VPC: {vpc_id}") + + subnet_id = run( + f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.subnetIds[0]' --output text" + ).stdout.strip() + logger.info(f"Using subnet: {subnet_id}") + + cluster_sg_id = run( + f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.clusterSecurityGroupId' --output text" + ).stdout.strip() + logger.info(f"Using cluster security group: {cluster_sg_id}") + + # create security group for FSx Lustre + sg_id = run( + f'aws ec2 create-security-group --group-name fsx-lustre-sg --description "Security group for FSx Lustre" --vpc-id {vpc_id} --query "GroupId" --output text' + ).stdout.strip() + + # add inbound rules for FSx Lustre + run( + f"aws ec2 authorize-security-group-ingress --group-id {sg_id} --protocol tcp --port 988-1023 --source-group {cluster_sg_id}" + ) + run( + f"aws ec2 authorize-security-group-ingress --group-id {sg_id} --protocol tcp --port 988-1023 --source-group {sg_id}" + ) + + # create FSx filesystem + fsx_id = run( + f'aws fsx create-file-system --file-system-type LUSTRE --storage-capacity 1200 --subnet-ids {subnet_id} --security-group-ids {sg_id} --lustre-configuration DeploymentType=SCRATCH_2 --tags Key=Name,Value=vllm-model-storage --query "FileSystem.FileSystemId" --output text' + ).stdout.strip() + + logger.info("Waiting for FSx filesystem to be available...") + while True: + status = run( + f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].Lifecycle' --output text" + ).stdout.strip() + if status == "AVAILABLE": + break + logger.info(f"FSx status: {status}, waiting...") + time.sleep(30) + + # get FSx DNS and mount name + fsx_dns = run( + f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].DNSName' --output text" + ).stdout.strip() + + fsx_mount = run( + f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].LustreConfiguration.MountName' --output text" + ).stdout.strip() + + logger.info(f"FSx DNS: {fsx_dns}") + logger.info(f"FSx Mount Name: {fsx_mount}") + + # install AWS FSx CSI Driver + logger.info("Installing AWS FSx CSI Driver...") + run( + "helm repo add aws-fsx-csi-driver https://kubernetes-sigs.github.io/aws-fsx-csi-driver/" + ) + run("helm repo update") + run( + "helm install aws-fsx-csi-driver aws-fsx-csi-driver/aws-fsx-csi-driver --namespace kube-system" + ) + run( + "kubectl wait --for=condition=ready pod -l app=fsx-csi-controller -n kube-system --timeout=300s" + ) + + # verify FSx CSI driver pods are running + logger.info("Checking FSx CSI driver pods...") + result = run("kubectl get pods -n kube-system | grep fsx") + + if "fsx-csi-controller" not in result.stdout or "fsx-csi-node" not in result.stdout: + raise Exception("FSx CSI driver pods not found") + + # count running FSx pods + fsx_pods = [ + line + for line in result.stdout.split("\n") + if ("fsx-csi-controller" in line or "fsx-csi-node" in line) and "Running" in line + ] + logger.info(f"Found {len(fsx_pods)} running FSx CSI driver pods") + + if not fsx_pods: + raise Exception("No running FSx CSI driver pods found") + + logger.info("FSx CSI driver verification completed") + + # create Kubernetes resources for FSx Lustre + run( + f"cd aws-vllm-dlc-blog-repo && sed -i 's||{subnet_id}|g' fsx-storage-class.yaml" + ) + run(f"cd aws-vllm-dlc-blog-repo && sed -i 's||{sg_id}|g' fsx-storage-class.yaml") + run(f"cd aws-vllm-dlc-blog-repo && sed -i 's||{fsx_id}|g' fsx-lustre-pv.yaml") + run( + f"cd aws-vllm-dlc-blog-repo && sed -i 's|.fsx.us-west-2.amazonaws.com|{fsx_dns}|g' fsx-lustre-pv.yaml" + ) + run( + f"cd aws-vllm-dlc-blog-repo && sed -i 's||{fsx_mount}|g' fsx-lustre-pv.yaml" + ) + + # apply FSx Kubernetes resources + logger.info("Creating FSx Kubernetes storage resources...") + run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-storage-class.yaml") + run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-lustre-pv.yaml") + run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-lustre-pvc.yaml") + + # make sure storage resources are created correctly + logger.info("Validating FSx storage resources...") + + # check storage class + sc_result = run("kubectl get sc fsx-sc") + if "fsx-sc" not in sc_result.stdout or "fsx.csi.aws.com" not in sc_result.stdout: + raise Exception("FSx storage class not created correctly") + logger.info("FSx storage class created") + + # check persistent volume + pv_result = run("kubectl get pv fsx-lustre-pv") + if "fsx-lustre-pv" not in pv_result.stdout or "Bound" not in pv_result.stdout: + raise Exception("FSx persistent volume not created correctly") + logger.info("FSx persistent volume created and bound") + + # check persistent volume claim + pvc_result = run("kubectl get pvc fsx-lustre-pvc") + if "fsx-lustre-pvc" not in pvc_result.stdout or "Bound" not in pvc_result.stdout: + raise Exception("FSx persistent volume claim not created correctly") + logger.info("FSx persistent volume claim created and bound") + + logger.info("FSx Lustre setup and validation completed") + + + def setup_load_balancer_controller(self): + logger.info("Setting up AWS Load Balancer Controller...") + run("helm repo add eks https://aws.github.io/eks-charts") + run("helm repo update") + run("kubectl apply -f https://raw.githubusercontent.com/aws/eks-charts/master/stable/aws-load-balancer-controller/crds/crds.yaml") + run( + f"helm install aws-load-balancer-controller eks/aws-load-balancer-controller -n kube-system --set clusterName={self.cluster_name} --set serviceAccount.create=false --set enableServiceMutatorWebhook=false" + ) + # install LeaderWorkerSet controller + run( + "helm install lws oci://registry.k8s.io/lws/charts/lws --version=0.6.1 --namespace lws-system --create-namespace --wait --timeout 300s" + ) + # wait for controllers to be ready + run( + "kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=aws-load-balancer-controller -n kube-system --timeout=300s" + ) + # setup sg for ALB + user_ip = run("curl -s https://checkip.amazonaws.com").stdout.strip() + vpc_id = run( + f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.vpcId' --output text" + ).stdout.strip() + # create ALB sg + alb_sg = run( + f'aws ec2 create-security-group --group-name vllm-alb-sg --description "Security group for vLLM ALB" --vpc-id {vpc_id} --query "GroupId" --output text' + ).stdout.strip() + # allow inbound traffic on port 80 from user IP + run( + f"aws ec2 authorize-security-group-ingress --group-id {alb_sg} --protocol tcp --port 80 --cidr {user_ip}/32" + ) + # get node sg + node_instance_id = run( + 'aws ec2 describe-instances --filters "Name=tag:eks:nodegroup-name,Values=vllm-p4d-nodes-efa" --query "Reservations[0].Instances[0].InstanceId" --output text' + ).stdout.strip() + node_sg = run( + f"aws ec2 describe-instances --instance-ids {node_instance_id} --query 'Reservations[0].Instances[0].SecurityGroups[0].GroupId' --output text" + ).stdout.strip() + # allow traffic from ALB to nodes on port 8000 + run( + f"aws ec2 authorize-security-group-ingress --group-id {node_sg} --protocol tcp --port 8000 --source-group {alb_sg}" + ) + # update the sg in the ingress file + run( + f"cd aws-vllm-dlc-blog-repo && sed -i 's||{alb_sg}|g' vllm-deepseek-32b-lws-ingress.yaml" + ) + + # verify sg were created and configured correctly + logger.info("Verifying security group configurations...") + + # verify ALB sg + alb_sg_result = run( + f'aws ec2 describe-security-groups --group-ids {alb_sg} --query "SecurityGroups[0].IpPermissions"' + ) + if "80" not in alb_sg_result.stdout: + raise Exception("ALB security group not configured correctly - missing port 80 rule") + logger.info("ALB security group configured correctly") + + # verify node sg rules + node_sg_result = run( + f'aws ec2 describe-security-groups --group-ids {node_sg} --query "SecurityGroups[0].IpPermissions"' + ) + if "8000" not in node_sg_result.stdout: + raise Exception("Node security group not configured correctly - missing port 8000 rule") + + logger.info("Node security group configured correctly") + + logger.info("Load Balancer Controller setup and verification completed") + + + def cleanup_resources(self): + logger.info("Running cleanup script...") + try: + script_path = "test/vllm_tests/test/vllm_eks_cleanup.sh" + run(f"chmod +x {script_path}") + run( + f"cd aws-vllm-dlc-blog-repo && echo 'y' | ../{script_path}", + check=False, + timeout=3600, + ) + logger.info("Cleanup completed successfully") + + except Exception as e: + logger.error(f"Cleanup failed: {e}") + + + def cleanup_infrastructure(self): + try: + if self.connection: + self.cleanup_resources() + + if self.instance_id: + ec2_utils.terminate_instance(self.instance_id, region=self.region) + + if self.key_filename: + destroy_ssh_keypair(self.ec2_client, self.key_filename) + + except Exception as e: + logger.error(f"Cleanup failed: {e}") diff --git a/test/vllm_tests/infra/test_vllm_eks_cleanup.sh b/test/vllm_tests/infra/test_vllm_eks_cleanup.sh new file mode 100755 index 000000000000..11a6dea45362 --- /dev/null +++ b/test/vllm_tests/infra/test_vllm_eks_cleanup.sh @@ -0,0 +1,325 @@ +#!/bin/bash +# Cleanup script for vLLM DeepSeek 32B deployment on EKS +# This script deletes all resources created for the vLLM deployment +# with appropriate wait times to ensure proper deletion + +set -e # Exit on error +set -o pipefail # Exit if any command in a pipe fails + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# AWS Profile to use +REGION="us-west-2" +CLUSTER_NAME="vllm-cluster" +NODEGROUP_NAME="vllm-p4d-nodes-efa" + +# Function to print section headers +print_section() { + echo -e "\n${BLUE}=== $1 ===${NC}" +} + +# Function to print success messages +print_success() { + echo -e "${GREEN}✓ $1${NC}" +} + +# Function to print warning messages +print_warning() { + echo -e "${YELLOW}⚠ $1${NC}" +} + +# Function to print error messages +print_error() { + echo -e "${RED}✗ $1${NC}" +} + +# Function to wait for a resource to be deleted +wait_for_deletion() { + local check_command="$1" + local resource_name="$2" + local timeout_seconds="$3" + local start_time=$(date +%s) + local end_time=$((start_time + timeout_seconds)) + + echo -e "${YELLOW}Waiting for $resource_name to be deleted (timeout: ${timeout_seconds}s)...${NC}" + + while true; do + if ! eval "$check_command" &>/dev/null; then + print_success "$resource_name deleted successfully" + return 0 + fi + + current_time=$(date +%s) + if [ $current_time -gt $end_time ]; then + print_warning "$resource_name deletion timed out after ${timeout_seconds}s" + return 1 + fi + + echo -n "." + sleep 10 + done +} + +# Function to check if a command exists +command_exists() { + command -v "$1" &> /dev/null +} + +# Check for required tools +for cmd in kubectl aws eksctl helm; do + if ! command_exists $cmd; then + print_error "Required command '$cmd' not found. Please install it and try again." + exit 1 + fi +done + +# Confirm with the user +echo -e "${RED}WARNING: This script will delete all resources related to the vLLM deployment.${NC}" +echo -e "${RED}This action is irreversible and will result in data loss.${NC}" +read -p "Are you sure you want to proceed? (y/N): " -n 1 -r +echo +if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Cleanup cancelled." + exit 0 +fi + +# Store security group IDs for later use +print_section "Retrieving security group IDs" +echo "Getting ALB security group ID..." +ALB_SG=$(kubectl get ingress vllm-deepseek-32b-lws-ingress -o jsonpath='{.metadata.annotations.alb\.ingress\.kubernetes\.io/security-groups}' 2>/dev/null || echo "") +if [ -z "$ALB_SG" ]; then + print_warning "Could not retrieve ALB security group ID from ingress. Will try to find it later." +fi + + + +echo "Getting FSx security group ID..." +FSX_ID=$(kubectl get pv fsx-lustre-pv -o jsonpath='{.spec.csi.volumeHandle}' 2>/dev/null | cut -d'/' -f1 || echo "") +if [ -n "$FSX_ID" ]; then + echo "Found FSx filesystem ID: $FSX_ID" + SG_ID=$(aws fsx describe-file-systems --file-system-id $FSX_ID --query "FileSystems[0].NetworkInterfaceIds[0]" --output text 2>/dev/null | xargs -I{} aws ec2 describe-network-interfaces --network-interface-ids {} --query "NetworkInterfaces[0].Groups[0].GroupId" --output text 2>/dev/null || echo "") + if [ -n "$SG_ID" ]; then + echo "Found FSx security group ID: $SG_ID" + else + print_warning "Could not retrieve FSx security group ID." + fi +else + print_warning "Could not retrieve FSx filesystem ID." +fi + +echo "Getting Node security group ID..." +NODE_SG=$(aws ec2 describe-security-groups --filters "Name=tag:aws:cloudformation:logical-id,Values=NodeSecurityGroup" "Name=tag:eks:cluster-name,Values=$CLUSTER_NAME" --query "SecurityGroups[0].GroupId" --output text 2>/dev/null || echo "") +if [ -n "$NODE_SG" ]; then + echo "Found Node security group ID: $NODE_SG" +else + print_warning "Could not retrieve Node security group ID." +fi + +echo "Getting VPC ID from the EKS cluster..." +VPC_ID=$(aws eks describe-cluster --name $CLUSTER_NAME --query "cluster.resourcesVpcConfig.vpcId" --output text 2>/dev/null || echo "") +if [ -n "$VPC_ID" ]; then + echo "Found VPC ID: $VPC_ID" + + +else + print_warning "Could not retrieve VPC ID from the EKS cluster." +fi + +# 1. Delete Kubernetes Resources +print_section "Deleting Kubernetes Resources" + +echo "Deleting vLLM ingress..." +kubectl delete -f vllm-deepseek-32b-lws-ingress.yaml --ignore-not-found +print_success "Ingress deletion initiated" + +echo "Waiting 30 seconds for ingress controller to process deletion..." +sleep 30 + +echo "Deleting vLLM LeaderWorkerSet..." +kubectl delete -f vllm-deepseek-32b-lws.yaml --ignore-not-found +print_success "LeaderWorkerSet deletion initiated" + +echo "Waiting 60 seconds for pods to terminate..." +sleep 60 + +echo "Deleting FSx Lustre PVC..." +kubectl delete -f fsx-lustre-pvc.yaml --ignore-not-found +print_success "PVC deletion initiated" + +echo "Waiting 10 seconds for PVC deletion to process..." +sleep 10 + +echo "Deleting FSx Lustre PV..." +kubectl delete -f fsx-lustre-pv.yaml --ignore-not-found +print_success "PV deletion initiated" + +echo "Waiting 10 seconds for PV deletion to process..." +sleep 10 + +echo "Deleting storage class..." +kubectl delete -f fsx-storage-class.yaml --ignore-not-found +print_success "Storage class deletion initiated" + +echo "Deleting AWS Load Balancer Controller..." +helm uninstall aws-load-balancer-controller -n kube-system --ignore-not-found +print_success "AWS Load Balancer Controller deletion initiated" + +echo "Waiting 60 seconds for controller termination..." +sleep 60 + +echo "Verifying all resources are deleted..." +kubectl get pods,svc,ingress,pv,pvc +print_success "Kubernetes resource deletion completed" + +# 2. Delete the IAM Service Account CloudFormation Stack +print_section "Deleting IAM Service Account CloudFormation Stack" + +STACK_NAME="eksctl-${CLUSTER_NAME}-addon-iamserviceaccount-kube-system-aws-load-balancer-controller" +echo "Deleting CloudFormation stack: $STACK_NAME" +aws cloudformation delete-stack --stack-name $STACK_NAME 2>/dev/null || true + +wait_for_deletion "aws cloudformation describe-stacks --stack-name $STACK_NAME" "IAM Service Account CloudFormation Stack" 300 +print_success "IAM Service Account CloudFormation Stack deletion completed" + +# 3. Delete the IAM Policy +print_section "Deleting IAM Policy" + +echo "Getting the ARN of the IAM policy..." +POLICY_ARN=$(aws iam list-policies --query "Policies[?PolicyName=='AWSLoadBalancerControllerIAMPolicy'].Arn" --output text) + +if [ -n "$POLICY_ARN" ] && [ "$POLICY_ARN" != "None" ]; then + echo "Deleting IAM policy: $POLICY_ARN" + aws iam delete-policy --policy-arn $POLICY_ARN + print_success "IAM policy deleted" +else + print_warning "IAM policy not found or already deleted" +fi + +# 4. Delete the FSx Lustre Filesystem +print_section "Deleting FSx Lustre Filesystem" + +if [ -n "$FSX_ID" ]; then + echo "Deleting FSx Lustre filesystem: $FSX_ID" + aws fsx delete-file-system --file-system-id $FSX_ID 2>/dev/null || true + + wait_for_deletion "aws fsx describe-file-systems --file-system-id $FSX_ID" "FSx Lustre filesystem" 600 + print_success "FSx Lustre filesystem deletion completed" +else + print_warning "FSx Lustre filesystem ID not found or already deleted" +fi + +# 5. Check for Any Remaining Load Balancers +print_section "Checking for Remaining Load Balancers" + +echo "Checking for ALBs and NLBs..." +aws elbv2 describe-load-balancers --query "LoadBalancers[?contains(DNSName, '${CLUSTER_NAME}')].LoadBalancerArn" --output text | while read -r lb_arn; do + if [ -n "$lb_arn" ]; then + echo "Deleting load balancer: $lb_arn" + aws elbv2 delete-load-balancer --load-balancer-arn $lb_arn + fi +done + +echo "Checking for Classic ELBs..." +aws elb describe-load-balancers --query "LoadBalancerDescriptions[?contains(DNSName, '${CLUSTER_NAME}')].LoadBalancerName" --output text | while read -r lb_name; do + if [ -n "$lb_name" ]; then + echo "Deleting classic load balancer: $lb_name" + aws elb delete-load-balancer --load-balancer-name $lb_name + fi +done + +print_success "Load balancer cleanup completed" + +# 6. Delete the Node Group +print_section "Deleting Node Group" + +# Check if node group exists before attempting to delete it +echo "Checking if node group exists: $NODEGROUP_NAME" +if eksctl get nodegroup --cluster=$CLUSTER_NAME --name=$NODEGROUP_NAME --region=$REGION &>/dev/null; then + echo "Node group exists. Deleting node group: $NODEGROUP_NAME" + eksctl delete nodegroup --cluster=$CLUSTER_NAME --name=$NODEGROUP_NAME --region=$REGION --drain=false + + wait_for_deletion "eksctl get nodegroup --cluster=$CLUSTER_NAME --name=$NODEGROUP_NAME --region=$REGION" "Node group" 1100 + print_success "Node group deletion completed" +else + print_warning "Node group $NODEGROUP_NAME not found or already deleted" +fi + +# 7. Delete the Security Groups +print_section "Deleting Security Groups" + +# Delete security groups in the recommended order: FSx SG -> Node SG -> ALB SG + +if [ -n "$SG_ID" ]; then + echo "Deleting FSx security group: $SG_ID" + aws ec2 delete-security-group --group-id $SG_ID 2>/dev/null || print_warning "Failed to delete FSx security group" + if [ $? -eq 0 ]; then + print_success "FSx security group deleted" + fi +else + print_warning "FSx security group ID not found or already deleted" +fi + +echo "Waiting 30 seconds after FSx security group deletion" +sleep 30 + +if [ -n "$NODE_SG" ]; then + echo "Deleting Node security group: $NODE_SG" + aws ec2 delete-security-group --group-id $NODE_SG 2>/dev/null || print_warning "Failed to delete Node security group" + if [ $? -eq 0 ]; then + print_success "Node security group deleted" + fi +else + print_warning "Node security group ID not found or already deleted" +fi + +echo "Waiting 30 seconds after Node security group deletion" +sleep 30 + + +if [ -n "$ALB_SG" ]; then + echo "Deleting ALB security group: $ALB_SG" + aws ec2 delete-security-group --group-id $ALB_SG 2>/dev/null || print_warning "Failed to delete ALB security group" + if [ $? -eq 0 ]; then + print_success "ALB security group deleted" + fi +else + print_warning "ALB security group ID not found or already deleted" +fi + +echo "Waiting 30 seconds after ALB security group deletion" +sleep 30 + +# 8. Delete the EKS Cluster +print_section "Deleting EKS Cluster" + +echo "Deleting EKS cluster: $CLUSTER_NAME" +eksctl delete cluster --name=$CLUSTER_NAME --region=$REGION + +wait_for_deletion "aws eks describe-cluster --name $CLUSTER_NAME" "EKS cluster" 1100 +print_success "EKS cluster deletion completed" + +# 9. Final Verification +print_section "Final Verification" + +echo "Checking for any remaining CloudFormation stacks..." +REMAINING_STACKS=$(aws cloudformation list-stacks --stack-status-filter CREATE_COMPLETE UPDATE_COMPLETE DELETE_FAILED --query "StackSummaries[?contains(StackName, '${CLUSTER_NAME}')].StackName" --output text) + +if [ -n "$REMAINING_STACKS" ]; then + print_warning "Some CloudFormation stacks still exist:" + echo "$REMAINING_STACKS" + echo + echo "You may need to manually delete these stacks or troubleshoot deletion failures." + echo "See the README.md section on 'Troubleshooting CloudFormation Stack Deletion Failures'." +else + print_success "No remaining CloudFormation stacks found" +fi + +print_section "Cleanup Complete" +echo "All resources related to the vLLM deployment have been deleted or cleanup has been initiated." +echo "Some AWS resources may still be in the process of being deleted." +echo "Please check the AWS Management Console to verify all resources have been properly cleaned up." \ No newline at end of file diff --git a/test/vllm_tests/main.py b/test/vllm_tests/main.py new file mode 100644 index 000000000000..3f189a301f7b --- /dev/null +++ b/test/vllm_tests/main.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import logging +import sys +from infra.eks_infra import EksInfrastructure +from test.eks_test import VllmEksTest + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + +def run_vllm_test(): + infrastructure = None + try: + logger.info("Setting up EKS infrastructure...") + infrastructure = EksInfrastructure() + if not infrastructure.setup_infrastructure(): + raise Exception("Infrastructure setup failed") + logger.info("Infrastructure setup completed successfully") + + logger.info("Starting vLLM tests...") + test = VllmEksTest() + if not test.run_tests(): + raise Exception("vLLM tests failed") + logger.info("vLLM tests completed successfully") + return 0 + + except Exception as e: + logger.error(f"Test execution failed: {e}") + return 1 + + finally: + if infrastructure: + logger.info("Cleaning up infrastructure...") + infrastructure.cleanup_infrastructure() + logger.info("Cleanup completed") + +def main(): + sys.exit(run_vllm_test()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test/vllm_tests/test/__init__.py b/test/vllm_tests/test/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/test/vllm_tests/test/eks_test.py b/test/vllm_tests/test/eks_test.py new file mode 100644 index 000000000000..c32a97c1a623 --- /dev/null +++ b/test/vllm_tests/test/eks_test.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +import logging +import time +from invoke import run + +logger = logging.getLogger(__name__) + +class VllmEksTest: + def __init__(self): + pass + + def run_tests(self): + try: + self.deploy_vllm_service() + self.test_vllm_api() + return True + except Exception as e: + logger.error(f"Test execution failed: {e}") + return False + + + def deploy_vllm_service(self): + logger.info("Deploying vLLM service...") + + # first, wait until the AWS Load Balancer Controller is running + logger.info("Waiting for AWS Load Balancer Controller to be ready...") + max_retries = 20 # 10 minutes total + retry_count = 0 + + while retry_count < max_retries: + result = run("kubectl get pods -n kube-system | grep aws-load-balancer-controller", warn=True) + if "aws-load-balancer-controller" in result.stdout: + # count total and running ALB controller pods + all_alb_pods = [ + line for line in result.stdout.split("\n") + if "aws-load-balancer-controller" in line and line.strip() + ] + running_alb_pods = [ + line for line in all_alb_pods + if "Running" in line + ] + if all_alb_pods and len(running_alb_pods) == len(all_alb_pods): + logger.info(f"All {len(running_alb_pods)} AWS Load Balancer Controller pods are running") + logger.info("AWS Load Balancer Controller is ready") + break + else: + logger.info(f"ALB controller pods: {len(running_alb_pods)}/{len(all_alb_pods)} running") + + retry_count += 1 + logger.info(f"ALB controller not ready yet, waiting... (attempt {retry_count}/{max_retries})") + time.sleep(30) + + if retry_count >= max_retries: + raise Exception("AWS Load Balancer Controller pods failed to start after 10 minutes") + + # apply the LeaderWorkerSet + run("cd aws-vllm-dlc-blog-repo && kubectl apply -f vllm-deepseek-32b-lws.yaml") + # apply the ingress + run("cd aws-vllm-dlc-blog-repo && kubectl apply -f vllm-deepseek-32b-lws-ingress.yaml") + + # monitor pod status until Running (can take 15-30 minutes for large GPU images + model loading) + logger.info("Waiting for vLLM pods to reach Running status...") + logger.info("This may take 15-30 minutes for container image pull and model loading") + + max_retries = 60 # 30 minutes total + retry_count = 0 + + while retry_count < max_retries: + result = run("kubectl get pods -l app=vllm-deepseek-32b-lws", warn=True) + if "vllm-deepseek-32b-lws" in result.stdout: + # count total and running vLLM pods + all_vllm_pods = [ + line for line in result.stdout.split("\n") + if "vllm-deepseek-32b-lws" in line and line.strip() and "NAME" not in line + ] + running_vllm_pods = [ + line for line in all_vllm_pods + if "Running" in line + ] + if all_vllm_pods and len(running_vllm_pods) == len(all_vllm_pods): + logger.info(f"All {len(running_vllm_pods)} vLLM pods are running") + logger.info("vLLM service is ready") + break + else: + statuses = [] + for line in all_vllm_pods: + parts = line.split() + if len(parts) >= 3: + pod_name = parts[0] + status = parts[2] + statuses.append(f"{pod_name}: {status}") + logger.info(f"vLLM pods status: {', '.join(statuses)}") + + retry_count += 1 + logger.info(f"vLLM pods not ready yet, waiting... (attempt {retry_count}/{max_retries})") + time.sleep(30) + + if retry_count >= max_retries: + raise Exception("vLLM pods failed to reach Running status after 30 minutes") + + logger.info("vLLM service deployed successfully") + + + def test_vllm_api(self): + logger.info("Testing vLLM API...") + + endpoint = run( + "kubectl get ingress vllm-deepseek-32b-lws-ingress -o jsonpath='{.status.loadBalancer.ingress[0].hostname}'" + ).stdout.strip() + logger.info(f"vLLM API endpoint: {endpoint}") + + # Test 1: completions API + logger.info("Testing completions API...") + result = run( + f"""curl -X POST http://{endpoint}/v1/completions \ + -H "Content-Type: application/json" \ + -d '{{"model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "prompt": "Hello, how are you?", "max_tokens": 50, "temperature": 0.7}}' + """ + ) + assert '"object":"text_completion"' in result.stdout, "vLLM completions API test failed" + logger.info("Completions API test passed") + + # Test 2: chat completions API + logger.info("Testing chat completions API...") + result = run( + f"""curl -X POST http://{endpoint}/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{{"model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "messages": [{{"role": "user", "content": "What are the benefits of using FSx Lustre with EKS?"}}], "max_tokens": 100, "temperature": 0.7}}' + """ + ) + assert '"object":"chat.completion"' in result.stdout, "vLLM chat completions API test failed" + logger.info("Chat completions API test passed") + + logger.info("All vLLM API tests passed successfully") diff --git a/vllm/buildspec.yml b/vllm/buildspec.yml index 1de64837c3c0..5d2db6dae4af 100644 --- a/vllm/buildspec.yml +++ b/vllm/buildspec.yml @@ -49,3 +49,6 @@ images: test_platforms: - sanity - security + test: + - type: eks + target: test/vllm_tests/test_vllm_eks_integration.py From 33d99a0ebc582c96a36f4469a55ff73ebcb6446e Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Wed, 9 Jul 2025 22:48:15 -0700 Subject: [PATCH 02/16] separate fsx setup functions to util file and update eks_infra --- test/vllm_tests/infra/eks_infra.py | 203 +++++--------- .../{test => infra/utils}/__init__.py | 0 test/vllm_tests/infra/utils/fsx_utils.py | 254 ++++++++++++++++++ test/vllm_tests/test_artifacts/__init__.py | 0 .../{test => test_artifacts}/eks_test.py | 0 .../{main.py => vllm_test_trigger.py} | 4 +- 6 files changed, 322 insertions(+), 139 deletions(-) rename test/vllm_tests/{test => infra/utils}/__init__.py (100%) create mode 100644 test/vllm_tests/infra/utils/fsx_utils.py create mode 100644 test/vllm_tests/test_artifacts/__init__.py rename test/vllm_tests/{test => test_artifacts}/eks_test.py (100%) rename test/vllm_tests/{main.py => vllm_test_trigger.py} (89%) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index 7ae08675f47f..81d86efc3f54 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -7,9 +7,10 @@ import boto3 import uuid from invoke import run -from test_utils import eks as eks_utils -from test_utils import ec2 as ec2_utils -from test_utils import ( +from .utils.fsx_utils import FsxSetup +from test.test_utils import eks as eks_utils +from test.test_utils import ec2 as ec2_utils +from test.test_utils import ( generate_ssh_keypair, destroy_ssh_keypair, get_dlami_id, @@ -70,13 +71,11 @@ def setup_infrastructure(self): def setup_eks_tools(self): - logger.info("Setting up EKS tools...") - # use existing setup for eksctl, kubectl, aws-iam-authenticator - eks_utils.eks_setup() - # install helm separately - self.install_helm() + logger.info("Setting up EKS tools...") + eks_utils.eks_setup() + self.install_helm() - logger.info("EKS tools setup completed") + logger.info("EKS tools setup completed") def install_helm(self): logger.info("Installing Helm...") @@ -223,141 +222,71 @@ def validate_cluster_setup(self): def setup_fsx_lustre(self): - """ - Setup FSx Lustre filesystem with complete configuration - """ - logger.info("Setting up FSx Lustre filesystem...") - - vpc_id = run( - f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.vpcId' --output text" - ).stdout.strip() - logger.info(f"Using VPC: {vpc_id}") - - subnet_id = run( - f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.subnetIds[0]' --output text" - ).stdout.strip() - logger.info(f"Using subnet: {subnet_id}") - - cluster_sg_id = run( - f"aws eks describe-cluster --name {self.cluster_name} --query 'cluster.resourcesVpcConfig.clusterSecurityGroupId' --output text" - ).stdout.strip() - logger.info(f"Using cluster security group: {cluster_sg_id}") - - # create security group for FSx Lustre - sg_id = run( - f'aws ec2 create-security-group --group-name fsx-lustre-sg --description "Security group for FSx Lustre" --vpc-id {vpc_id} --query "GroupId" --output text' - ).stdout.strip() - - # add inbound rules for FSx Lustre - run( - f"aws ec2 authorize-security-group-ingress --group-id {sg_id} --protocol tcp --port 988-1023 --source-group {cluster_sg_id}" - ) - run( - f"aws ec2 authorize-security-group-ingress --group-id {sg_id} --protocol tcp --port 988-1023 --source-group {sg_id}" - ) - - # create FSx filesystem - fsx_id = run( - f'aws fsx create-file-system --file-system-type LUSTRE --storage-capacity 1200 --subnet-ids {subnet_id} --security-group-ids {sg_id} --lustre-configuration DeploymentType=SCRATCH_2 --tags Key=Name,Value=vllm-model-storage --query "FileSystem.FileSystemId" --output text' - ).stdout.strip() - - logger.info("Waiting for FSx filesystem to be available...") - while True: - status = run( - f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].Lifecycle' --output text" + try: + logger.info("Setting up FSx Lustre filesystem...") + fsx = FsxSetup(self.region) + vpc_id = run( + f"aws eks describe-cluster --name {self.cluster_name} " + f"--query 'cluster.resourcesVpcConfig.vpcId' --output text" ).stdout.strip() - if status == "AVAILABLE": - break - logger.info(f"FSx status: {status}, waiting...") - time.sleep(30) - - # get FSx DNS and mount name - fsx_dns = run( - f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].DNSName' --output text" - ).stdout.strip() - - fsx_mount = run( - f"aws fsx describe-file-systems --file-system-id {fsx_id} --query 'FileSystems[0].LustreConfiguration.MountName' --output text" - ).stdout.strip() - - logger.info(f"FSx DNS: {fsx_dns}") - logger.info(f"FSx Mount Name: {fsx_mount}") - - # install AWS FSx CSI Driver - logger.info("Installing AWS FSx CSI Driver...") - run( - "helm repo add aws-fsx-csi-driver https://kubernetes-sigs.github.io/aws-fsx-csi-driver/" - ) - run("helm repo update") - run( - "helm install aws-fsx-csi-driver aws-fsx-csi-driver/aws-fsx-csi-driver --namespace kube-system" - ) - run( - "kubectl wait --for=condition=ready pod -l app=fsx-csi-controller -n kube-system --timeout=300s" - ) - - # verify FSx CSI driver pods are running - logger.info("Checking FSx CSI driver pods...") - result = run("kubectl get pods -n kube-system | grep fsx") - - if "fsx-csi-controller" not in result.stdout or "fsx-csi-node" not in result.stdout: - raise Exception("FSx CSI driver pods not found") - - # count running FSx pods - fsx_pods = [ - line - for line in result.stdout.split("\n") - if ("fsx-csi-controller" in line or "fsx-csi-node" in line) and "Running" in line - ] - logger.info(f"Found {len(fsx_pods)} running FSx CSI driver pods") - - if not fsx_pods: - raise Exception("No running FSx CSI driver pods found") - - logger.info("FSx CSI driver verification completed") - - # create Kubernetes resources for FSx Lustre - run( - f"cd aws-vllm-dlc-blog-repo && sed -i 's||{subnet_id}|g' fsx-storage-class.yaml" - ) - run(f"cd aws-vllm-dlc-blog-repo && sed -i 's||{sg_id}|g' fsx-storage-class.yaml") - run(f"cd aws-vllm-dlc-blog-repo && sed -i 's||{fsx_id}|g' fsx-lustre-pv.yaml") - run( - f"cd aws-vllm-dlc-blog-repo && sed -i 's|.fsx.us-west-2.amazonaws.com|{fsx_dns}|g' fsx-lustre-pv.yaml" - ) - run( - f"cd aws-vllm-dlc-blog-repo && sed -i 's||{fsx_mount}|g' fsx-lustre-pv.yaml" - ) + logger.info(f"Using VPC: {vpc_id}") - # apply FSx Kubernetes resources - logger.info("Creating FSx Kubernetes storage resources...") - run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-storage-class.yaml") - run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-lustre-pv.yaml") - run("cd aws-vllm-dlc-blog-repo && kubectl apply -f fsx-lustre-pvc.yaml") + subnet_id = run( + f"aws eks describe-cluster --name {self.cluster_name} " + f"--query 'cluster.resourcesVpcConfig.subnetIds[0]' --output text" + ).stdout.strip() + logger.info(f"Using subnet: {subnet_id}") - # make sure storage resources are created correctly - logger.info("Validating FSx storage resources...") + cluster_sg_id = run( + f"aws eks describe-cluster --name {self.cluster_name} " + f"--query 'cluster.resourcesVpcConfig.clusterSecurityGroupId' --output text" + ).stdout.strip() + logger.info(f"Using cluster security group: {cluster_sg_id}") - # check storage class - sc_result = run("kubectl get sc fsx-sc") - if "fsx-sc" not in sc_result.stdout or "fsx.csi.aws.com" not in sc_result.stdout: - raise Exception("FSx storage class not created correctly") - logger.info("FSx storage class created") + sg_id = fsx.create_security_group( + vpc_id=vpc_id, + name="fsx-lustre-sg", + description="Security group for FSx Lustre" + ) - # check persistent volume - pv_result = run("kubectl get pv fsx-lustre-pv") - if "fsx-lustre-pv" not in pv_result.stdout or "Bound" not in pv_result.stdout: - raise Exception("FSx persistent volume not created correctly") - logger.info("FSx persistent volume created and bound") + fsx.add_security_group_ingress_rules( + security_group_id=sg_id, + ingress_rules=[ + {"protocol": "tcp", "port": "988-1023", "source-group": cluster_sg_id}, + {"protocol": "tcp", "port": "988-1023", "source-group": sg_id} + ] + ) - # check persistent volume claim - pvc_result = run("kubectl get pvc fsx-lustre-pvc") - if "fsx-lustre-pvc" not in pvc_result.stdout or "Bound" not in pvc_result.stdout: - raise Exception("FSx persistent volume claim not created correctly") - logger.info("FSx persistent volume claim created and bound") + fs_info = fsx.create_fsx_filesystem( + subnet_id=subnet_id, + security_group_ids=[sg_id], + storage_capacity=1200, + deployment_type="SCRATCH_2", + tags={"Name": "vllm-model-storage"} + ) - logger.info("FSx Lustre setup and validation completed") + fsx.setup_csi_driver() + + # TODO: change the path to yaml files moved to the DLC repo + fsx.setup_kubernetes_resources( + storage_class_file="aws-vllm-dlc-blog-repo/fsx-storage-class.yaml", + pv_file="aws-vllm-dlc-blog-repo/fsx-lustre-pv.yaml", + pvc_file="aws-vllm-dlc-blog-repo/fsx-lustre-pvc.yaml", + replacements={ + "": subnet_id, + "": sg_id, + "": fs_info["filesystem_id"], + ".fsx.us-west-2.amazonaws.com": fs_info["dns_name"], + "": fs_info["mount_name"] + } + ) + logger.info("FSx Lustre setup completed successfully") + + except Exception as e: + logger.error(f"FSx Lustre setup failed: {e}") + raise + def setup_load_balancer_controller(self): logger.info("Setting up AWS Load Balancer Controller...") @@ -430,7 +359,7 @@ def setup_load_balancer_controller(self): def cleanup_resources(self): logger.info("Running cleanup script...") try: - script_path = "test/vllm_tests/test/vllm_eks_cleanup.sh" + script_path = "test/vllm_tests/infra/test_vllm_eks_cleanup.sh" run(f"chmod +x {script_path}") run( f"cd aws-vllm-dlc-blog-repo && echo 'y' | ../{script_path}", diff --git a/test/vllm_tests/test/__init__.py b/test/vllm_tests/infra/utils/__init__.py similarity index 100% rename from test/vllm_tests/test/__init__.py rename to test/vllm_tests/infra/utils/__init__.py diff --git a/test/vllm_tests/infra/utils/fsx_utils.py b/test/vllm_tests/infra/utils/fsx_utils.py new file mode 100644 index 000000000000..d69e3e103af6 --- /dev/null +++ b/test/vllm_tests/infra/utils/fsx_utils.py @@ -0,0 +1,254 @@ +import logging +import time +from invoke import run +from typing import Dict, List, Any + +logger = logging.getLogger(__name__) + +class FsxSetup: + """ + A utility class for setting up and managing FSx for Lustre filesystems + and related AWS and Kubernetes resources. + + : param region: AWS region where resources will be created (default: "us-west-2") + """ + def __init__(self, region: str = "us-west-2"): + self.region = region + + def create_fsx_filesystem( + self, + subnet_id: str, + security_group_ids: List[str], + storage_capacity: int, + deployment_type: str, + tags: Dict[str, str], + ): + """ + Create FSx filesystem with given configuration + : param subnet_id: subnet ID where FSx will be created + : param security_group_ids: list of security group IDs + : param storage_capacity: storage capacity in GiB + : param deployment_type: FSx deployment type + : param tags: dictionary of tags to apply to the FSx filesystem + : return: dictionary containing filesystem details + """ + tags_param = " ".join([f"Key={k},Value={v}" for k, v in tags.items()]) + + try: + fsx_id = run( + f'aws fsx create-file-system' + f' --file-system-type LUSTRE' + f' --storage-capacity {storage_capacity}' + f' --subnet-ids {subnet_id}' + f' --security-group-ids {" ".join(security_group_ids)}' + f' --lustre-configuration DeploymentType={deployment_type}' + f' --tags {tags_param}' + f' --query "FileSystem.FileSystemId"' + f' --output text' + ).stdout.strip() + + logger.info(f"Created FSx filesystem: {fsx_id}") + + filesystem_info = self.wait_for_filesystem(fsx_id) + return filesystem_info + + except Exception as e: + logger.error(f"Failed to create FSx filesystem: {e}") + raise + + + def wait_for_filesystem(self, filesystem_id: str): + """ + Wait for FSx filesystem to become available and return its details + : param filesystem_id: FSx filesystem ID + : return: dictionary containing filesystem details (filesystem_id, dns_name, mount_name) + : raises: Exception if filesystem enters FAILED, DELETING, or DELETED state + """ + logger.info(f"Waiting for FSx filesystem {filesystem_id} to be available...") + while True: + status = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].Lifecycle' --output text" + ).stdout.strip() + + if status == "AVAILABLE": + break + elif status in ["FAILED", "DELETING", "DELETED"]: + raise Exception(f"FSx filesystem entered {status} state") + + logger.info(f"FSx status: {status}, waiting...") + time.sleep(30) + + # get fs DNS and mount name + fsx_dns = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].DNSName' --output text" + ).stdout.strip() + + fsx_mount = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].LustreConfiguration.MountName' --output text" + ).stdout.strip() + + return { + 'filesystem_id': filesystem_id, + 'dns_name': fsx_dns, + 'mount_name': fsx_mount + } + + def create_security_group( + self, + vpc_id: str, + name: str, + description: str + ): + """ + Create a security group in the specified VPC + : param vpc_id: VPC ID where the security group will be created + : param name: name of the security group + : param description: description of the security group + : return: created security group ID + : raises: Exception if security group creation fails + """ + try: + sg_id = run( + f'aws ec2 create-security-group' + f' --group-name {name}' + f' --description "{description}"' + f' --vpc-id {vpc_id}' + f' --query "GroupId"' + f' --output text' + ).stdout.strip() + logger.info(f"Created security group: {sg_id}") + return sg_id + + except Exception as e: + logger.error(f"Failed to create security group: {e}") + raise + + + def add_security_group_ingress_rules( + self, + security_group_id: str, + ingress_rules: List[Dict[str, Any]] + ): + """ + Add ingress rules to an existing security group + : param security_group_id: ID of the security group to modify + : param ingress_rules: list of dictionaries containing ingress rule configurations + Example: [{"protocol": "tcp", "port": "988-1023", "source-group": "sg-xxx"}] + : return: None + : raises: Exception if adding ingress rules fails + """ + try: + for rule in ingress_rules: + cmd = f"aws ec2 authorize-security-group-ingress --group-id {security_group_id}" + for key, value in rule.items(): + cmd += f" --{key} {value}" + run(cmd) + + logger.info(f"Added ingress rules to security group: {security_group_id}") + + except Exception as e: + logger.error(f"Failed to add ingress rules to security group: {e}") + raise + + + def setup_csi_driver(self): + """ + Install and configure the AWS FSx CSI Driver in the Kubernetes cluster + : return: None + : raises: Exception if driver installation or verification fails + """ + try: + logger.info("Installing AWS FSx CSI Driver...") + run("helm repo add aws-fsx-csi-driver https://kubernetes-sigs.github.io/aws-fsx-csi-driver/") + run("helm repo update") + run("helm install aws-fsx-csi-driver aws-fsx-csi-driver/aws-fsx-csi-driver --namespace kube-system") + run("kubectl wait --for=condition=ready pod -l app=fsx-csi-controller -n kube-system --timeout=300s") + + self._verify_csi_driver() + logger.info("FSx CSI Driver installed successfully") + except Exception as e: + logger.error(f"Failed to setup FSx CSI driver: {e}") + raise + + + def _verify_csi_driver(self): + """ + Verify that FSx CSI driver pods are running correctly in the cluster + : return: None + : raises: Exception if driver pods are not found or not running + """ + result = run("kubectl get pods -n kube-system | grep fsx") + + if "fsx-csi-controller" not in result.stdout or "fsx-csi-node" not in result.stdout: + raise Exception("FSx CSI driver pods not found") + + fsx_pods = [ + line for line in result.stdout.split("\n") + if ("fsx-csi-controller" in line or "fsx-csi-node" in line) and "Running" in line + ] + + if not fsx_pods: + raise Exception("No running FSx CSI driver pods found") + + logger.info(f"Found {len(fsx_pods)} running FSx CSI driver pods") + + + def setup_kubernetes_resources( + self, + storage_class_file: str, + pv_file: str, + pvc_file: str, + replacements: Dict[str, str] + ): + """ + Setup Kubernetes FSx resources using provided yaml files and replacements + : param storage_class_file: path to the storage class yaml file + : param pv_file: path to the persistent volume yaml file + : param pvc_file: path to the persistent volume claim yaml file + : param replacements: dictionary of placeholder replacements + Example: {"": "subnet-xxx", "": "sg-xxx"} + : return: None + : raises: Exception if resource creation fails + """ + try: + for file_path in [storage_class_file, pv_file, pvc_file]: + for key, value in replacements.items(): + run(f"sed -i 's|{key}|{value}|g' {file_path}") + + for file_path in [storage_class_file, pv_file, pvc_file]: + run(f"kubectl apply -f {file_path}") + + self.validate_kubernetes_resources() + + except Exception as e: + logger.error(f"Failed to setup Kubernetes FSx resources: {e}") + raise + + def validate_kubernetes_resources(self): + """ + Validate that FSx Kubernetes resources are properly created and bound + : return: True if all resources are validated successfully + : raises: Exception if any resource validation fails + """ + try: + sc_result = run("kubectl get sc fsx-sc") + if "fsx-sc" not in sc_result.stdout or "fsx.csi.aws.com" not in sc_result.stdout: + raise Exception("FSx storage class not created correctly") + + pv_result = run("kubectl get pv fsx-lustre-pv") + if "fsx-lustre-pv" not in pv_result.stdout or "Bound" not in pv_result.stdout: + raise Exception("FSx persistent volume not created correctly") + + pvc_result = run("kubectl get pvc fsx-lustre-pvc") + if "fsx-lustre-pvc" not in pvc_result.stdout or "Bound" not in pvc_result.stdout: + raise Exception("FSx persistent volume claim not created correctly") + + logger.info("FSx Kubernetes resources validated successfully") + return True + + except Exception as e: + logger.error(f"FSx resource validation failed: {e}") + raise diff --git a/test/vllm_tests/test_artifacts/__init__.py b/test/vllm_tests/test_artifacts/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/test/vllm_tests/test/eks_test.py b/test/vllm_tests/test_artifacts/eks_test.py similarity index 100% rename from test/vllm_tests/test/eks_test.py rename to test/vllm_tests/test_artifacts/eks_test.py diff --git a/test/vllm_tests/main.py b/test/vllm_tests/vllm_test_trigger.py similarity index 89% rename from test/vllm_tests/main.py rename to test/vllm_tests/vllm_test_trigger.py index 3f189a301f7b..38b66b3ee362 100644 --- a/test/vllm_tests/main.py +++ b/test/vllm_tests/vllm_test_trigger.py @@ -2,8 +2,8 @@ import logging import sys -from infra.eks_infra import EksInfrastructure -from test.eks_test import VllmEksTest +from test.vllm_tests.infra.eks_infra import EksInfrastructure +from test.vllm_tests.test_artifacts.eks_test import VllmEksTest logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) From fc3ac90bda4982e57a95acb4cba026d8690b2973 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 09:18:50 -0700 Subject: [PATCH 03/16] update testrunner to run vllm_test_trigger --- dlc_developer_config.toml | 4 +-- test/testrunner.py | 55 +++++++++++++++++++++++++++------------ vllm/buildspec.yml | 4 +-- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/dlc_developer_config.toml b/dlc_developer_config.toml index 1962bfd69e21..79ce70b66ee3 100644 --- a/dlc_developer_config.toml +++ b/dlc_developer_config.toml @@ -41,8 +41,8 @@ build_frameworks = [] # By default we build both training and inference containers. Set true/false values to determine which to build. -build_training = true -build_inference = true +build_training = false +build_inference = false # Set do_build to "false" to skip builds and test the latest image built by this PR # Note: at least one build is required to set do_build to "false" diff --git a/test/testrunner.py b/test/testrunner.py index bee1946f52f8..6d087d215bd4 100644 --- a/test/testrunner.py +++ b/test/testrunner.py @@ -311,7 +311,7 @@ def main(): if ( build_context == "MAINLINE" and all("base" in image_uri or "vllm" in image_uri for image_uri in all_image_list) - and test_type not in {"functionality_sanity", "security_sanity"} + and test_type not in {"functionality_sanity", "security_sanity", "eks", "ec2"} ): LOGGER.info( f"NOTE: {specific_test_type} tests not supported on base or vllm images. Skipping..." @@ -400,23 +400,44 @@ def main(): if specific_test_type == "bai": build_bai_docker_container() if specific_test_type == "eks" and not is_all_images_list_eia: - frameworks_in_images = [ - framework - for framework in ("mxnet", "pytorch", "tensorflow") - if framework in dlc_images - ] - if len(frameworks_in_images) != 1: - raise ValueError( - f"All images in dlc_images must be of a single framework for EKS tests.\n" - f"Instead seeing {frameworks_in_images} frameworks." - ) - framework = frameworks_in_images[0] - eks_cluster_name = f"dlc-{framework}-{build_context}" - eks_utils.eks_setup() - if eks_utils.is_eks_cluster_active(eks_cluster_name): - eks_utils.eks_write_kubeconfig(eks_cluster_name) + is_vllm_image = any("vllm" in image_uri for image_uri in all_image_list) + if is_vllm_image: + # skip pytest execution for vLLM - use vllm trigger instead + original_dir = os.getcwd() + try: + os.chdir(os.path.join("..", "..")) + from test.vllm_tests.vllm_test_trigger import run_vllm_test + + result = run_vllm_test() + if result != 0: + raise Exception("vLLM EKS test failed") + + LOGGER.info("vLLM EKS tests completed successfully") + return # skip pytest + + except Exception as e: + LOGGER.error(f"vLLM EKS test failed: {e}") + raise + finally: + os.chdir(original_dir) else: - raise Exception(f"EKS cluster {eks_cluster_name} is not in active state") + frameworks_in_images = [ + framework + for framework in ("mxnet", "pytorch", "tensorflow") + if framework in dlc_images + ] + if len(frameworks_in_images) != 1: + raise ValueError( + f"All images in dlc_images must be of a single framework for EKS tests.\n" + f"Instead seeing {frameworks_in_images} frameworks." + ) + framework = frameworks_in_images[0] + eks_cluster_name = f"dlc-{framework}-{build_context}" + eks_utils.eks_setup() + if eks_utils.is_eks_cluster_active(eks_cluster_name): + eks_utils.eks_write_kubeconfig(eks_cluster_name) + else: + raise Exception(f"EKS cluster {eks_cluster_name} is not in active state") # Execute dlc_tests pytest command pytest_cmd = [ diff --git a/vllm/buildspec.yml b/vllm/buildspec.yml index 5d2db6dae4af..1fbba0e18540 100644 --- a/vllm/buildspec.yml +++ b/vllm/buildspec.yml @@ -49,6 +49,4 @@ images: test_platforms: - sanity - security - test: - - type: eks - target: test/vllm_tests/test_vllm_eks_integration.py + - eks \ No newline at end of file From 03b35938cfa7cee3ffb0f9a4c165b1d0b1eb7c2b Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 11:18:42 -0700 Subject: [PATCH 04/16] modify testspec and test_trigger --- test/testrunner.py | 53 ++++++--------------- test/vllm_tests/vllm_test_trigger.py | 69 ++++++++++++++++++++++------ testspec.yml | 3 +- 3 files changed, 72 insertions(+), 53 deletions(-) diff --git a/test/testrunner.py b/test/testrunner.py index 6d087d215bd4..20e5e29fb930 100644 --- a/test/testrunner.py +++ b/test/testrunner.py @@ -400,44 +400,21 @@ def main(): if specific_test_type == "bai": build_bai_docker_container() if specific_test_type == "eks" and not is_all_images_list_eia: - is_vllm_image = any("vllm" in image_uri for image_uri in all_image_list) - if is_vllm_image: - # skip pytest execution for vLLM - use vllm trigger instead - original_dir = os.getcwd() - try: - os.chdir(os.path.join("..", "..")) - from test.vllm_tests.vllm_test_trigger import run_vllm_test - - result = run_vllm_test() - if result != 0: - raise Exception("vLLM EKS test failed") - - LOGGER.info("vLLM EKS tests completed successfully") - return # skip pytest - - except Exception as e: - LOGGER.error(f"vLLM EKS test failed: {e}") - raise - finally: - os.chdir(original_dir) - else: - frameworks_in_images = [ - framework - for framework in ("mxnet", "pytorch", "tensorflow") - if framework in dlc_images - ] - if len(frameworks_in_images) != 1: - raise ValueError( - f"All images in dlc_images must be of a single framework for EKS tests.\n" - f"Instead seeing {frameworks_in_images} frameworks." - ) - framework = frameworks_in_images[0] - eks_cluster_name = f"dlc-{framework}-{build_context}" - eks_utils.eks_setup() - if eks_utils.is_eks_cluster_active(eks_cluster_name): - eks_utils.eks_write_kubeconfig(eks_cluster_name) - else: - raise Exception(f"EKS cluster {eks_cluster_name} is not in active state") + frameworks_in_images = [ + framework + for framework in ("mxnet", "pytorch", "tensorflow") + if framework in dlc_images + ] + if len(frameworks_in_images) != 1: + raise ValueError( + f"All images in dlc_images must be of a single framework for EKS tests.\n" + f"Instead seeing {frameworks_in_images} frameworks." + ) + framework = frameworks_in_images[0] + eks_cluster_name = f"dlc-{framework}-{build_context}" + eks_utils.eks_setup() + if eks_utils.is_eks_cluster_active(eks_cluster_name): + eks_utils.eks_write_kubeconfig(eks_cluster_name) # Execute dlc_tests pytest command pytest_cmd = [ diff --git a/test/vllm_tests/vllm_test_trigger.py b/test/vllm_tests/vllm_test_trigger.py index 38b66b3ee362..8a8b33a7ad80 100644 --- a/test/vllm_tests/vllm_test_trigger.py +++ b/test/vllm_tests/vllm_test_trigger.py @@ -1,41 +1,82 @@ -#!/usr/bin/env python3 - +import os, sys import logging -import sys +from typing import List + +from test.test_utils import get_dlc_images from test.vllm_tests.infra.eks_infra import EksInfrastructure from test.vllm_tests.test_artifacts.eks_test import VllmEksTest -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) -def run_vllm_test(): + +def run_vllm_eks_test(): infrastructure = None try: - logger.info("Setting up EKS infrastructure...") + LOGGER.info("Setting up EKS infrastructure...") infrastructure = EksInfrastructure() if not infrastructure.setup_infrastructure(): raise Exception("Infrastructure setup failed") - logger.info("Infrastructure setup completed successfully") + LOGGER.info("Infrastructure setup completed successfully") - logger.info("Starting vLLM tests...") + LOGGER.info("Starting vLLM tests...") test = VllmEksTest() if not test.run_tests(): raise Exception("vLLM tests failed") - logger.info("vLLM tests completed successfully") + LOGGER.info("vLLM tests completed successfully") return 0 except Exception as e: - logger.error(f"Test execution failed: {e}") + LOGGER.error(f"Test execution failed: {e}") return 1 finally: if infrastructure: - logger.info("Cleaning up infrastructure...") + LOGGER.info("Cleaning up infrastructure...") infrastructure.cleanup_infrastructure() - logger.info("Cleanup completed") + LOGGER.info("Cleanup completed") + + +def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_enabled: bool): + """ + Run tests for a specific platform + """ + LOGGER.info(f"Running {platform} tests") + if platform == "eks": + result = run_vllm_eks_test() + if result != 0: + raise Exception("vLLM EKS tests failed") + LOGGER.info("vLLM EKS tests completed successfully") + def main(): - sys.exit(run_vllm_test()) + LOGGER.info("Triggering test from vllm") + test_type = os.getenv("TEST_TYPE") + + LOGGER.info(f"TEST_TYPE: {test_type}") + + executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" + dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() + + ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" + + commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") + LOGGER.info(f"Commit ID: {commit_id}") + + LOGGER.info(f"Images tested: {dlc_images}") + all_image_list = dlc_images.split(" ") + standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri] + LOGGER.info(f"\nImages URIs:\n{standard_images_list}") + + run_platform_tests( + platform=test_type, + images=standard_images_list, + commit_id=commit_id, + ipv6_enabled=ipv6_enabled, + ) + if __name__ == "__main__": main() \ No newline at end of file diff --git a/testspec.yml b/testspec.yml index 31abd98c35dd..fffb6658d4a4 100644 --- a/testspec.yml +++ b/testspec.yml @@ -23,7 +23,8 @@ phases: - pip install scheduler/. - echo Running pytest $TEST_TYPE tests on $DLC_IMAGES... - export PYTHONPATH=$PYTHONPATH:$(pwd)/src - - python test/testrunner.py + # - python test/testrunner.py + - python test/vllm_tests/vllm_test_trigger.py post_build: commands: - python src/send_status.py --status $CODEBUILD_BUILD_SUCCEEDING From ff304494ff515f0fc5803851dedc08fbe45993f6 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 11:32:56 -0700 Subject: [PATCH 05/16] add yaml files needed to test_artifacts --- .../test_artifacts/eks-cluster.yaml | 23 ++ .../test_artifacts/fsx-lustre-pv.yaml | 18 ++ .../test_artifacts/fsx-lustre-pvc.yaml | 11 + .../test_artifacts/fsx-storage-class.yaml | 18 ++ .../test_artifacts/large-model-nodegroup.yaml | 53 ++++ .../vllm-deepseek-32b-lws-ingress.yaml | 28 ++ .../test_artifacts/vllm-deepseek-32b-lws.yaml | 252 ++++++++++++++++++ 7 files changed, 403 insertions(+) create mode 100644 test/vllm_tests/test_artifacts/eks-cluster.yaml create mode 100644 test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml create mode 100644 test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml create mode 100644 test/vllm_tests/test_artifacts/fsx-storage-class.yaml create mode 100644 test/vllm_tests/test_artifacts/large-model-nodegroup.yaml create mode 100644 test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml create mode 100644 test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml diff --git a/test/vllm_tests/test_artifacts/eks-cluster.yaml b/test/vllm_tests/test_artifacts/eks-cluster.yaml new file mode 100644 index 000000000000..6e59c29bc804 --- /dev/null +++ b/test/vllm_tests/test_artifacts/eks-cluster.yaml @@ -0,0 +1,23 @@ +apiVersion: eksctl.io/v1alpha5 +kind: ClusterConfig + +metadata: + name: vllm-cluster + region: us-west-2 + version: "1.31" # Latest stable EKS version + +# Enable CloudWatch logging +cloudWatch: + clusterLogging: + enableTypes: ["api", "audit", "authenticator", "controllerManager", "scheduler"] + +# Add-ons for the cluster +addons: + - name: vpc-cni + version: latest + - name: coredns + version: latest + - name: kube-proxy + version: latest + - name: aws-ebs-csi-driver + version: latest diff --git a/test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml b/test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml new file mode 100644 index 000000000000..d94ba75ad632 --- /dev/null +++ b/test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: fsx-lustre-pv +spec: + capacity: + storage: 1200Gi # Adjust based on your FSx Lustre filesystem size + volumeMode: Filesystem + accessModes: + - ReadWriteMany + persistentVolumeReclaimPolicy: Retain + storageClassName: fsx-sc + csi: + driver: fsx.csi.aws.com + volumeHandle: # FSx Lustre filesystem ID + volumeAttributes: + dnsname: .fsx.us-west-2.amazonaws.com # FSx Lustre DNS name + mountname: # The mount name of your FSx Lustre filesyst diff --git a/test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml b/test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml new file mode 100644 index 000000000000..f03c420f864c --- /dev/null +++ b/test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: fsx-lustre-pvc +spec: + accessModes: + - ReadWriteMany + storageClassName: fsx-sc + resources: + requests: + storage: 1200Gi # Should match the PV capacity diff --git a/test/vllm_tests/test_artifacts/fsx-storage-class.yaml b/test/vllm_tests/test_artifacts/fsx-storage-class.yaml new file mode 100644 index 000000000000..31b81d8495ff --- /dev/null +++ b/test/vllm_tests/test_artifacts/fsx-storage-class.yaml @@ -0,0 +1,18 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: fsx-sc +provisioner: fsx.csi.aws.com +parameters: + subnetId: + securityGroupIds: + deploymentType: SCRATCH_2 + automaticBackupRetentionDays: "0" + dailyAutomaticBackupStartTime: "00:00" + copyTagsToBackups: "false" + perUnitStorageThroughput: "50" + dataCompressionType: "NONE" +reclaimPolicy: Retain +volumeBindingMode: Immediate +mountOptions: + - flock diff --git a/test/vllm_tests/test_artifacts/large-model-nodegroup.yaml b/test/vllm_tests/test_artifacts/large-model-nodegroup.yaml new file mode 100644 index 000000000000..093777fe6b50 --- /dev/null +++ b/test/vllm_tests/test_artifacts/large-model-nodegroup.yaml @@ -0,0 +1,53 @@ +apiVersion: eksctl.io/v1alpha5 +kind: ClusterConfig + +metadata: + name: vllm-cluster + region: us-west-2 + +managedNodeGroups: + - name: vllm-p4d-nodes-efa + instanceType: p4d.24xlarge + minSize: 0 + maxSize: 2 + desiredCapacity: 2 + availabilityZones: ["us-west-2a"] # EFA-enabled nodegroups must have only one subnet or one availability zone + volumeSize: 100 + privateNetworking: true + # Use the EKS-optimized GPU AMI + ami: ami-01f1fc27c5979ac62 # Amazon EKS GPU node 1.31 (k8s: 1.31.7, containerd: 1.7.*) + amiFamily: AmazonLinux2 + labels: + role: large-model-worker + nvidia.com/gpu: "true" + k8s.amazonaws.com/accelerator: nvidia-gpu + aws.amazon.com/efa: "true" # Add EFA label + tags: + nodegroup-role: large-model-worker + iam: + withAddonPolicies: + autoScaler: true + albIngress: true + cloudWatch: true + ebs: true + imageBuilder: true + # Enable EFA interfaces + efaEnabled: true + # Override bootstrap command for custom AMI + overrideBootstrapCommand: | + #!/bin/bash + set -ex + + # Install EFA driver and related packages + curl -O https://efa-installer.amazonaws.com/aws-efa-installer-latest.tar.gz + tar -xf aws-efa-installer-latest.tar.gz + cd aws-efa-installer + ./efa_installer.sh -y + + # Configure NCCL to use EFA + echo "export FI_PROVIDER=efa" >> /etc/environment + echo "export FI_EFA_USE_DEVICE_RDMA=1" >> /etc/environment + echo "export NCCL_DEBUG=INFO" >> /etc/environment + + # Standard EKS bootstrap + /etc/eks/bootstrap.sh vllm-cluster --container-runtime containerd diff --git a/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml b/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml new file mode 100644 index 000000000000..5b5706eccd88 --- /dev/null +++ b/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml @@ -0,0 +1,28 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: vllm-deepseek-32b-lws-ingress + annotations: + # Use AWS Load Balancer Controller with ALB + alb.ingress.kubernetes.io/scheme: internet-facing + alb.ingress.kubernetes.io/target-type: ip + alb.ingress.kubernetes.io/security-groups: + alb.ingress.kubernetes.io/healthcheck-path: /health + alb.ingress.kubernetes.io/healthcheck-port: '8000' + alb.ingress.kubernetes.io/healthcheck-protocol: HTTP + alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}]' + alb.ingress.kubernetes.io/load-balancer-attributes: load_balancing.cross_zone.enabled=true + # Specify ALB class + kubernetes.io/ingress.class: alb +spec: + ingressClassName: alb + rules: + - http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: vllm-deepseek-32b-lws-leader + port: + number: 8000 \ No newline at end of file diff --git a/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml b/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml new file mode 100644 index 000000000000..d79afe1153af --- /dev/null +++ b/test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml @@ -0,0 +1,252 @@ +apiVersion: leaderworkerset.x-k8s.io/v1 +kind: LeaderWorkerSet +metadata: + name: vllm-deepseek-32b-lws +spec: + replicas: 1 + leaderWorkerTemplate: + size: 2 # Total number of nodes (1 leader + 1 worker) + restartPolicy: RecreateGroupOnPodRestart + leaderTemplate: + metadata: + labels: + role: leader + spec: + containers: + - name: vllm-leader + image: 763104351884.dkr.ecr.us-east-1.amazonaws.com/vllm:0.8.5-gpu-py312-ec2 + securityContext: + privileged: true + capabilities: + add: ["IPC_LOCK"] + env: + # Ray configuration + - name: RAY_DISABLE_RUNTIME_ENV + value: "1" + - name: RAY_SCHEDULER_EVENTS + value: "0" + - name: RAY_WORKER_REGISTER_TIMEOUT_SECONDS + value: "300" + # NCCL configuration for distributed training + - name: NCCL_DEBUG + value: "INFO" + - name: NCCL_IB_DISABLE + value: "1" + - name: NCCL_P2P_DISABLE + value: "1" + - name: NCCL_NET_GDR_LEVEL + value: "0" + - name: NCCL_SHM_DISABLE + value: "1" + # EFA-specific environment variables + - name: FI_PROVIDER + value: "efa" + - name: FI_EFA_USE_DEVICE_RDMA + value: "1" + - name: FI_EFA_FORK_SAFE + value: "1" + # Hugging Face configuration + - name: TRANSFORMERS_CACHE + value: "/mnt/fsx/models" + - name: HF_HOME + value: "/mnt/fsx/models" + - name: HUGGING_FACE_HUB_TOKEN + valueFrom: + secretKeyRef: + name: huggingface-token + key: token + optional: true + # Add host IP for Ray + - name: VLLM_HOST_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + command: ["/bin/bash"] + args: + - "-c" + - | + set -x + + # Start ray leader + ray start --head --port=6379 --num-cpus=48 --num-gpus=8 + sleep 10 + ray status + fi_info -p efa + + # Start vllm server + python -m vllm.entrypoints.openai.api_server \ + --model deepseek-ai/DeepSeek-R1-Distill-Qwen-32B \ + --host 0.0.0.0 \ + --port 8000 \ + --tensor-parallel-size 8 \ + --pipeline-parallel-size 2 \ + --download-dir /mnt/fsx/models \ + --max-model-len 4096 \ + --gpu-memory-utilization 0.85 + resources: + limits: + nvidia.com/gpu: "8" + cpu: "48" + memory: "256Gi" + vpc.amazonaws.com/efa: 4 + requests: + nvidia.com/gpu: "8" + cpu: "48" + memory: "256Gi" + vpc.amazonaws.com/efa: 4 + ports: + - containerPort: 8000 + readinessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 300 + periodSeconds: 30 + timeoutSeconds: 10 + successThreshold: 1 + failureThreshold: 10 + volumeMounts: + - name: fsx-lustre-volume + mountPath: /mnt/fsx + # Mount the EFA devices + #- name: efa-devices + # mountPath: /dev/infiniband + # Mount a larger shared memory volume + - name: dshm + mountPath: /dev/shm + volumes: + - name: fsx-lustre-volume + persistentVolumeClaim: + claimName: fsx-lustre-pvc + # Add volume for EFA devices + #- name: efa-devices + # hostPath: + # path: /dev/infiniband + # Add a larger shared memory volume + - name: dshm + emptyDir: + medium: Memory + sizeLimit: "30Gi" # Increase shared memory size + nodeSelector: + role: large-model-worker + # Add tolerations for EFA + tolerations: + - key: "aws.amazon.com/efa" + operator: "Exists" + effect: "NoSchedule" + workerTemplate: + spec: + containers: + - name: vllm-worker + image: 763104351884.dkr.ecr.us-east-1.amazonaws.com/vllm:0.8.5-gpu-py312-ec2 + securityContext: + privileged: true + capabilities: + add: ["IPC_LOCK"] + env: + # Ray configuration + - name: RAY_DISABLE_RUNTIME_ENV + value: "1" + - name: RAY_SCHEDULER_EVENTS + value: "0" + - name: RAY_WORKER_REGISTER_TIMEOUT_SECONDS + value: "300" + # NCCL configuration for distributed training + - name: NCCL_DEBUG + value: "INFO" + - name: NCCL_IB_DISABLE + value: "1" + - name: NCCL_P2P_DISABLE + value: "1" + - name: NCCL_NET_GDR_LEVEL + value: "0" + - name: NCCL_SHM_DISABLE + value: "1" + # EFA-specific environment variables + - name: FI_PROVIDER + value: "efa" + - name: FI_EFA_USE_DEVICE_RDMA + value: "1" + - name: FI_EFA_FORK_SAFE + value: "1" + # Hugging Face configuration + - name: TRANSFORMERS_CACHE + value: "/mnt/fsx/models" + - name: HF_HOME + value: "/mnt/fsx/models" + - name: HUGGING_FACE_HUB_TOKEN + valueFrom: + secretKeyRef: + name: huggingface-token + key: token + optional: true + # Add host IP for Ray + - name: VLLM_HOST_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + command: ["/bin/bash"] + args: + - "-c" + - | + set -x + + # Start ray worker + ray start --address=$(LWS_LEADER_ADDRESS):6379 --num-cpus=48 --num-gpus=8 --block + resources: + limits: + nvidia.com/gpu: "8" + cpu: "48" + memory: "256Gi" + vpc.amazonaws.com/efa: 4 + requests: + nvidia.com/gpu: "8" + cpu: "48" + memory: "256Gi" + vpc.amazonaws.com/efa: 4 + volumeMounts: + - name: fsx-lustre-volume + mountPath: /mnt/fsx + # Mount the EFA devices + #- name: efa-devices + # mountPath: /dev/infiniband + # Mount a larger shared memory volume + - name: dshm + mountPath: /dev/shm + volumes: + - name: fsx-lustre-volume + persistentVolumeClaim: + claimName: fsx-lustre-pvc + # Add volume for EFA devices + #- name: efa-devices + # hostPath: + # path: /dev/infiniband + # Add a larger shared memory volume + - name: dshm + emptyDir: + medium: Memory + sizeLimit: "30Gi" # Increase shared memory size + nodeSelector: + role: large-model-worker + # Add tolerations for EFA + tolerations: + - key: "aws.amazon.com/efa" + operator: "Exists" + effect: "NoSchedule" +--- +apiVersion: v1 +kind: Service +metadata: + name: vllm-deepseek-32b-lws-leader +spec: + ports: + - name: port-8000 + port: 8000 + targetPort: 8000 + - name: port-8265 + port: 8265 + targetPort: 8265 + type: ClusterIP + selector: + leaderworkerset.sigs.k8s.io/name: vllm-deepseek-32b-lws + role: leader \ No newline at end of file From b1c46bbd6224a4063e957d936241ce1cd7127f81 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 11:44:45 -0700 Subject: [PATCH 06/16] update yaml paths to use added yaml files and remove functions no longer needed --- test/vllm_tests/infra/eks_infra.py | 27 ++++++---------------- test/vllm_tests/test_artifacts/eks_test.py | 4 ++-- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index 81d86efc3f54..c7248d62906d 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -57,7 +57,6 @@ def setup_infrastructure(self): # install prerequisites self.connection.run("pip3 install --user boto3 invoke packaging") self.validate_required_tools() - self.setup_prerequisites() self.create_eks_cluster() self.validate_cluster_setup() self.setup_fsx_lustre() @@ -140,17 +139,6 @@ def validate_aws_credentials(self): return False - def setup_prerequisites(self): - """ - Setup required tools and repositories - """ - logger.info("Setting up prerequisites...") - - run("pip install --quiet git-remote-codecommit") - run("git config --global --add protocol.codecommit.allow always") - run("git clone codecommit::us-west-2://aws-vllm-dlc-blog-repo aws-vllm-dlc-blog-repo") - - def create_eks_cluster(self): """ Create EKS cluster and setup IAM access @@ -158,12 +146,12 @@ def create_eks_cluster(self): logger.info("Creating EKS cluster...") run( - f"cd aws-vllm-dlc-blog-repo && eksctl create cluster -f eks-cluster.yaml --region {self.region}" + f"eksctl create cluster -f test/vllm_tests/test_artifacts/eks-cluster.yaml --region {self.region}" ) # create a node group with EFA Support run( - f"cd aws-vllm-dlc-blog-repo && eksctl create nodegroup -f large-model-nodegroup.yaml --region {self.region}" + f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml --region {self.region}" ) eks_utils.eks_write_kubeconfig(self.cluster_name, self.region) @@ -267,11 +255,10 @@ def setup_fsx_lustre(self): fsx.setup_csi_driver() - # TODO: change the path to yaml files moved to the DLC repo fsx.setup_kubernetes_resources( - storage_class_file="aws-vllm-dlc-blog-repo/fsx-storage-class.yaml", - pv_file="aws-vllm-dlc-blog-repo/fsx-lustre-pv.yaml", - pvc_file="aws-vllm-dlc-blog-repo/fsx-lustre-pvc.yaml", + storage_class_file="test/vllm_tests/test_artifacts/fsx-storage-class.yaml", + pv_file="test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml", + pvc_file="test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml", replacements={ "": subnet_id, "": sg_id, @@ -330,7 +317,7 @@ def setup_load_balancer_controller(self): ) # update the sg in the ingress file run( - f"cd aws-vllm-dlc-blog-repo && sed -i 's||{alb_sg}|g' vllm-deepseek-32b-lws-ingress.yaml" + f"sed -i 's||{alb_sg}|g' test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml" ) # verify sg were created and configured correctly @@ -362,7 +349,7 @@ def cleanup_resources(self): script_path = "test/vllm_tests/infra/test_vllm_eks_cleanup.sh" run(f"chmod +x {script_path}") run( - f"cd aws-vllm-dlc-blog-repo && echo 'y' | ../{script_path}", + f"echo 'y' | {script_path}", check=False, timeout=3600, ) diff --git a/test/vllm_tests/test_artifacts/eks_test.py b/test/vllm_tests/test_artifacts/eks_test.py index c32a97c1a623..453936b3197e 100644 --- a/test/vllm_tests/test_artifacts/eks_test.py +++ b/test/vllm_tests/test_artifacts/eks_test.py @@ -55,9 +55,9 @@ def deploy_vllm_service(self): raise Exception("AWS Load Balancer Controller pods failed to start after 10 minutes") # apply the LeaderWorkerSet - run("cd aws-vllm-dlc-blog-repo && kubectl apply -f vllm-deepseek-32b-lws.yaml") + run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml") # apply the ingress - run("cd aws-vllm-dlc-blog-repo && kubectl apply -f vllm-deepseek-32b-lws-ingress.yaml") + run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml") # monitor pod status until Running (can take 15-30 minutes for large GPU images + model loading) logger.info("Waiting for vLLM pods to reach Running status...") From 0efb720ddcd99311fe17ff2292ceb03307708b82 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 11:52:28 -0700 Subject: [PATCH 07/16] Revert change to testrunner --- test/testrunner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/testrunner.py b/test/testrunner.py index 20e5e29fb930..65670112f318 100644 --- a/test/testrunner.py +++ b/test/testrunner.py @@ -415,6 +415,8 @@ def main(): eks_utils.eks_setup() if eks_utils.is_eks_cluster_active(eks_cluster_name): eks_utils.eks_write_kubeconfig(eks_cluster_name) + else: + raise Exception(f"EKS cluster {eks_cluster_name} is not in active state") # Execute dlc_tests pytest command pytest_cmd = [ From 3da99d79f6b1a19f87bd05c246e16d10ac8be3c7 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 11:56:20 -0700 Subject: [PATCH 08/16] set do_build to false --- dlc_developer_config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlc_developer_config.toml b/dlc_developer_config.toml index 79ce70b66ee3..ace85265aeef 100644 --- a/dlc_developer_config.toml +++ b/dlc_developer_config.toml @@ -46,7 +46,7 @@ build_inference = false # Set do_build to "false" to skip builds and test the latest image built by this PR # Note: at least one build is required to set do_build to "false" -do_build = true +do_build = false [notify] ### Notify on test failures From d22ac70f28c0670b28ab924b52a023ca677bf3e9 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 12:51:44 -0700 Subject: [PATCH 09/16] add print statements in test_trigger --- test/vllm_tests/vllm_test_trigger.py | 115 ++++++++++++++------------- 1 file changed, 59 insertions(+), 56 deletions(-) diff --git a/test/vllm_tests/vllm_test_trigger.py b/test/vllm_tests/vllm_test_trigger.py index 8a8b33a7ad80..d6c87d565890 100644 --- a/test/vllm_tests/vllm_test_trigger.py +++ b/test/vllm_tests/vllm_test_trigger.py @@ -11,71 +11,74 @@ LOGGER.addHandler(logging.StreamHandler(sys.stdout)) -def run_vllm_eks_test(): - infrastructure = None - try: - LOGGER.info("Setting up EKS infrastructure...") - infrastructure = EksInfrastructure() - if not infrastructure.setup_infrastructure(): - raise Exception("Infrastructure setup failed") - LOGGER.info("Infrastructure setup completed successfully") - - LOGGER.info("Starting vLLM tests...") - test = VllmEksTest() - if not test.run_tests(): - raise Exception("vLLM tests failed") - LOGGER.info("vLLM tests completed successfully") - return 0 - - except Exception as e: - LOGGER.error(f"Test execution failed: {e}") - return 1 - - finally: - if infrastructure: - LOGGER.info("Cleaning up infrastructure...") - infrastructure.cleanup_infrastructure() - LOGGER.info("Cleanup completed") - - -def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_enabled: bool): - """ - Run tests for a specific platform - """ - LOGGER.info(f"Running {platform} tests") - if platform == "eks": - result = run_vllm_eks_test() - if result != 0: - raise Exception("vLLM EKS tests failed") - LOGGER.info("vLLM EKS tests completed successfully") +# def run_vllm_eks_test(): +# infrastructure = None +# try: +# LOGGER.info("Setting up EKS infrastructure...") +# infrastructure = EksInfrastructure() +# if not infrastructure.setup_infrastructure(): +# raise Exception("Infrastructure setup failed") +# LOGGER.info("Infrastructure setup completed successfully") + +# LOGGER.info("Starting vLLM tests...") +# test = VllmEksTest() +# if not test.run_tests(): +# raise Exception("vLLM tests failed") +# LOGGER.info("vLLM tests completed successfully") +# return 0 + +# except Exception as e: +# LOGGER.error(f"Test execution failed: {e}") +# return 1 + +# finally: +# if infrastructure: +# LOGGER.info("Cleaning up infrastructure...") +# infrastructure.cleanup_infrastructure() +# LOGGER.info("Cleanup completed") + + +# def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_enabled: bool): +# """ +# Run tests for a specific platform +# """ +# LOGGER.info(f"Running {platform} tests") +# if platform == "eks": +# result = run_vllm_eks_test() +# if result != 0: +# raise Exception("vLLM EKS tests failed") +# LOGGER.info("vLLM EKS tests completed successfully") def main(): - LOGGER.info("Triggering test from vllm") - test_type = os.getenv("TEST_TYPE") + LOGGER.info("Starting vLLM tests...") + print("Starting vLLM tests...") - LOGGER.info(f"TEST_TYPE: {test_type}") + # LOGGER.info("Triggering test from vllm") + # test_type = os.getenv("TEST_TYPE") - executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" - dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() + # LOGGER.info(f"TEST_TYPE: {test_type}") - ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" - os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" + # executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" + # dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() - commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") - LOGGER.info(f"Commit ID: {commit_id}") + # ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + # os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" - LOGGER.info(f"Images tested: {dlc_images}") - all_image_list = dlc_images.split(" ") - standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri] - LOGGER.info(f"\nImages URIs:\n{standard_images_list}") + # commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") + # LOGGER.info(f"Commit ID: {commit_id}") - run_platform_tests( - platform=test_type, - images=standard_images_list, - commit_id=commit_id, - ipv6_enabled=ipv6_enabled, - ) + # LOGGER.info(f"Images tested: {dlc_images}") + # all_image_list = dlc_images.split(" ") + # standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri] + # LOGGER.info(f"\nImages URIs:\n{standard_images_list}") + + # run_platform_tests( + # platform=test_type, + # images=standard_images_list, + # commit_id=commit_id, + # ipv6_enabled=ipv6_enabled, + # ) if __name__ == "__main__": From 2f3afb878e31839daf7eb498a602c89639b26769 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 14:04:07 -0700 Subject: [PATCH 10/16] set framework to vllm --- dlc_developer_config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlc_developer_config.toml b/dlc_developer_config.toml index ace85265aeef..9fb6f973b22d 100644 --- a/dlc_developer_config.toml +++ b/dlc_developer_config.toml @@ -37,7 +37,7 @@ deep_canary_mode = false [build] # Add in frameworks you would like to build. By default, builds are disabled unless you specify building an image. # available frameworks - ["base", "vllm", "autogluon", "huggingface_tensorflow", "huggingface_pytorch", "huggingface_tensorflow_trcomp", "huggingface_pytorch_trcomp", "pytorch_trcomp", "tensorflow", "pytorch", "stabilityai_pytorch"] -build_frameworks = [] +build_frameworks = ["vllm"] # By default we build both training and inference containers. Set true/false values to determine which to build. From 7ad390f2febec4b33d3769a78012863cb7c37ddd Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 15:15:31 -0700 Subject: [PATCH 11/16] uncomment test in vllm test trigger --- test/vllm_tests/vllm_test_trigger.py | 115 +++++++++++++-------------- 1 file changed, 56 insertions(+), 59 deletions(-) diff --git a/test/vllm_tests/vllm_test_trigger.py b/test/vllm_tests/vllm_test_trigger.py index d6c87d565890..8a8b33a7ad80 100644 --- a/test/vllm_tests/vllm_test_trigger.py +++ b/test/vllm_tests/vllm_test_trigger.py @@ -11,74 +11,71 @@ LOGGER.addHandler(logging.StreamHandler(sys.stdout)) -# def run_vllm_eks_test(): -# infrastructure = None -# try: -# LOGGER.info("Setting up EKS infrastructure...") -# infrastructure = EksInfrastructure() -# if not infrastructure.setup_infrastructure(): -# raise Exception("Infrastructure setup failed") -# LOGGER.info("Infrastructure setup completed successfully") - -# LOGGER.info("Starting vLLM tests...") -# test = VllmEksTest() -# if not test.run_tests(): -# raise Exception("vLLM tests failed") -# LOGGER.info("vLLM tests completed successfully") -# return 0 - -# except Exception as e: -# LOGGER.error(f"Test execution failed: {e}") -# return 1 - -# finally: -# if infrastructure: -# LOGGER.info("Cleaning up infrastructure...") -# infrastructure.cleanup_infrastructure() -# LOGGER.info("Cleanup completed") - - -# def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_enabled: bool): -# """ -# Run tests for a specific platform -# """ -# LOGGER.info(f"Running {platform} tests") -# if platform == "eks": -# result = run_vllm_eks_test() -# if result != 0: -# raise Exception("vLLM EKS tests failed") -# LOGGER.info("vLLM EKS tests completed successfully") +def run_vllm_eks_test(): + infrastructure = None + try: + LOGGER.info("Setting up EKS infrastructure...") + infrastructure = EksInfrastructure() + if not infrastructure.setup_infrastructure(): + raise Exception("Infrastructure setup failed") + LOGGER.info("Infrastructure setup completed successfully") + + LOGGER.info("Starting vLLM tests...") + test = VllmEksTest() + if not test.run_tests(): + raise Exception("vLLM tests failed") + LOGGER.info("vLLM tests completed successfully") + return 0 + + except Exception as e: + LOGGER.error(f"Test execution failed: {e}") + return 1 + + finally: + if infrastructure: + LOGGER.info("Cleaning up infrastructure...") + infrastructure.cleanup_infrastructure() + LOGGER.info("Cleanup completed") + + +def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_enabled: bool): + """ + Run tests for a specific platform + """ + LOGGER.info(f"Running {platform} tests") + if platform == "eks": + result = run_vllm_eks_test() + if result != 0: + raise Exception("vLLM EKS tests failed") + LOGGER.info("vLLM EKS tests completed successfully") def main(): - LOGGER.info("Starting vLLM tests...") - print("Starting vLLM tests...") + LOGGER.info("Triggering test from vllm") + test_type = os.getenv("TEST_TYPE") - # LOGGER.info("Triggering test from vllm") - # test_type = os.getenv("TEST_TYPE") + LOGGER.info(f"TEST_TYPE: {test_type}") - # LOGGER.info(f"TEST_TYPE: {test_type}") + executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" + dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() - # executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" - # dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() + ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" - # ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" - # os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" + commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") + LOGGER.info(f"Commit ID: {commit_id}") - # commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") - # LOGGER.info(f"Commit ID: {commit_id}") + LOGGER.info(f"Images tested: {dlc_images}") + all_image_list = dlc_images.split(" ") + standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri] + LOGGER.info(f"\nImages URIs:\n{standard_images_list}") - # LOGGER.info(f"Images tested: {dlc_images}") - # all_image_list = dlc_images.split(" ") - # standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri] - # LOGGER.info(f"\nImages URIs:\n{standard_images_list}") - - # run_platform_tests( - # platform=test_type, - # images=standard_images_list, - # commit_id=commit_id, - # ipv6_enabled=ipv6_enabled, - # ) + run_platform_tests( + platform=test_type, + images=standard_images_list, + commit_id=commit_id, + ipv6_enabled=ipv6_enabled, + ) if __name__ == "__main__": From 4ca7069c40c1d6145bb0c6964b06a831921a8bf8 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 17:26:16 -0700 Subject: [PATCH 12/16] update paths --- test/vllm_tests/infra/eks_infra.py | 2 +- test/vllm_tests/infra/test_vllm_eks_cleanup.sh | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index c7248d62906d..ec0168ba21ea 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -86,7 +86,7 @@ def install_helm(self): "curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3" ) run("chmod 700 get_helm.sh") - run("sudo ./get_helm.sh") + run("./get_helm.sh") run("rm -f get_helm.sh") result = run("which helm", warn=True) diff --git a/test/vllm_tests/infra/test_vllm_eks_cleanup.sh b/test/vllm_tests/infra/test_vllm_eks_cleanup.sh index 11a6dea45362..df06a7d2429b 100755 --- a/test/vllm_tests/infra/test_vllm_eks_cleanup.sh +++ b/test/vllm_tests/infra/test_vllm_eks_cleanup.sh @@ -134,35 +134,35 @@ fi print_section "Deleting Kubernetes Resources" echo "Deleting vLLM ingress..." -kubectl delete -f vllm-deepseek-32b-lws-ingress.yaml --ignore-not-found +kubectl delete -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml --ignore-not-found print_success "Ingress deletion initiated" echo "Waiting 30 seconds for ingress controller to process deletion..." sleep 30 echo "Deleting vLLM LeaderWorkerSet..." -kubectl delete -f vllm-deepseek-32b-lws.yaml --ignore-not-found +kubectl delete -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml --ignore-not-found print_success "LeaderWorkerSet deletion initiated" echo "Waiting 60 seconds for pods to terminate..." sleep 60 echo "Deleting FSx Lustre PVC..." -kubectl delete -f fsx-lustre-pvc.yaml --ignore-not-found +kubectl delete -f test/vllm_tests/test_artifacts/fsx-lustre-pvc.yaml --ignore-not-found print_success "PVC deletion initiated" echo "Waiting 10 seconds for PVC deletion to process..." sleep 10 echo "Deleting FSx Lustre PV..." -kubectl delete -f fsx-lustre-pv.yaml --ignore-not-found +kubectl delete -f test/vllm_tests/test_artifacts/fsx-lustre-pv.yaml --ignore-not-found print_success "PV deletion initiated" echo "Waiting 10 seconds for PV deletion to process..." sleep 10 echo "Deleting storage class..." -kubectl delete -f fsx-storage-class.yaml --ignore-not-found +kubectl delete -f test/vllm_tests/test_artifacts/fsx-storage-class.yaml --ignore-not-found print_success "Storage class deletion initiated" echo "Deleting AWS Load Balancer Controller..." From 7561d647bef7a43d2cd1ede6deded24058fdbaf0 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 20:44:05 -0700 Subject: [PATCH 13/16] remove ec2 jumphost --- test/vllm_tests/infra/eks_infra.py | 94 ++++------------------ test/vllm_tests/test_artifacts/eks_test.py | 68 +++++++++------- 2 files changed, 52 insertions(+), 110 deletions(-) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index ec0168ba21ea..6c6449bef6af 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -5,16 +5,9 @@ import time import logging import boto3 -import uuid from invoke import run from .utils.fsx_utils import FsxSetup from test.test_utils import eks as eks_utils -from test.test_utils import ec2 as ec2_utils -from test.test_utils import ( - generate_ssh_keypair, - destroy_ssh_keypair, - get_dlami_id, -) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) @@ -23,45 +16,17 @@ class EksInfrastructure: def __init__(self): self.cluster_name = "vllm-cluster" self.region = os.getenv("AWS_REGION", "us-west-2") - self.instance_id = None - self.key_filename = None - self.ec2_client = None - self.connection = None + def setup_infrastructure(self): try: - self.ec2_client = boto3.client("ec2", region_name=self.region) - key_name = f"vllm-eks-test-{str(uuid.uuid4())}" - self.key_filename = generate_ssh_keypair(self.ec2_client, key_name) - - # launch EC2 instance - ami_id = get_dlami_id(self.region) - instance_type = ec2_utils.get_ec2_instance_type("c5.12xlarge", "cpu")[0] - instance_info = ec2_utils.launch_instance( - ami_id=ami_id, - instance_type=instance_type, - ec2_key_name=key_name, - region=self.region, - iam_instance_profile_name=ec2_utils.EC2_INSTANCE_ROLE_NAME, - instance_name="vLLM-EKS-Integration-Test", - ) - self.instance_id = instance_info["InstanceId"] - - # setup connection - ec2_utils.check_instance_state(self.instance_id, region=self.region) - ec2_utils.check_system_state(self.instance_id, region=self.region) - self.connection = ec2_utils.get_ec2_fabric_connection( - self.instance_id, self.key_filename, self.region - ) - - # install prerequisites - self.connection.run("pip3 install --user boto3 invoke packaging") + logger.info("Starting EKS infrastructure setup...") self.validate_required_tools() self.create_eks_cluster() self.validate_cluster_setup() self.setup_fsx_lustre() self.setup_load_balancer_controller() - + logger.info("EKS infrastructure setup completed successfully") return True except Exception as e: logger.error(f"Infrastructure setup failed: {e}") @@ -73,18 +38,17 @@ def setup_eks_tools(self): logger.info("Setting up EKS tools...") eks_utils.eks_setup() self.install_helm() - logger.info("EKS tools setup completed") + def install_helm(self): logger.info("Installing Helm...") result = run("which helm", warn=True) if result.return_code == 0: logger.info("Helm already installed") return - run( - "curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3" - ) + + run("curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3") run("chmod 700 get_helm.sh") run("./get_helm.sh") run("rm -f get_helm.sh") @@ -95,10 +59,8 @@ def install_helm(self): logger.info("Helm installed successfully") + def validate_required_tools(self): - """ - Validate required tools and handle installation - """ logger.info("Validating required tools...") required_tools = ["aws", "eksctl", "kubectl", "helm", "curl", "jq"] missing_tools = [] @@ -106,13 +68,13 @@ def validate_required_tools(self): for tool in required_tools: result = run(f"which {tool}", warn=True) if result.return_code != 0: - missing_tools.append((tool, tool.upper())) + missing_tools.append(tool) logger.warning(f"{tool} not found") else: logger.info(f"{tool} found: {result.stdout.strip()}") if missing_tools: - logger.info("Installing missing tools...") + logger.info(f"Installing missing tools: {', '.join(missing_tools)}") self.setup_eks_tools() logger.info("Tools installed successfully") else: @@ -120,9 +82,6 @@ def validate_required_tools(self): def validate_aws_credentials(self): - """ - Validate AWS credentials and set required IAM roles for EKS - """ logger.info("Validating AWS credentials...") try: sts_client = boto3.client("sts") @@ -140,32 +99,20 @@ def validate_aws_credentials(self): def create_eks_cluster(self): - """ - Create EKS cluster and setup IAM access - """ logger.info("Creating EKS cluster...") - run( - f"eksctl create cluster -f test/vllm_tests/test_artifacts/eks-cluster.yaml --region {self.region}" - ) + run(f"eksctl create cluster -f test/vllm_tests/test_artifacts/eks-cluster.yaml --region {self.region}") - # create a node group with EFA Support - run( - f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml --region {self.region}" - ) + run(f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml --region {self.region}") eks_utils.eks_write_kubeconfig(self.cluster_name, self.region) - # verify that nodes are ready result = run("kubectl get nodes") assert "Ready" in result.stdout, "EKS nodes not ready" logger.info("EKS cluster created successfully") def validate_cluster_setup(self): - """ - Validate cluster setup including NVIDIA device plugin - """ logger.info("Validating cluster setup...") if not eks_utils.is_eks_cluster_active(self.cluster_name): @@ -348,27 +295,14 @@ def cleanup_resources(self): try: script_path = "test/vllm_tests/infra/test_vllm_eks_cleanup.sh" run(f"chmod +x {script_path}") - run( - f"echo 'y' | {script_path}", - check=False, - timeout=3600, - ) + run(f"echo 'y' | {script_path}", check=False, timeout=3600) logger.info("Cleanup completed successfully") - except Exception as e: logger.error(f"Cleanup failed: {e}") def cleanup_infrastructure(self): try: - if self.connection: - self.cleanup_resources() - - if self.instance_id: - ec2_utils.terminate_instance(self.instance_id, region=self.region) - - if self.key_filename: - destroy_ssh_keypair(self.ec2_client, self.key_filename) - + self.cleanup_resources() except Exception as e: - logger.error(f"Cleanup failed: {e}") + logger.error(f"Infrastructure cleanup failed: {e}") diff --git a/test/vllm_tests/test_artifacts/eks_test.py b/test/vllm_tests/test_artifacts/eks_test.py index 453936b3197e..2051eea40554 100644 --- a/test/vllm_tests/test_artifacts/eks_test.py +++ b/test/vllm_tests/test_artifacts/eks_test.py @@ -10,10 +10,13 @@ class VllmEksTest: def __init__(self): pass + def run_tests(self): try: + logger.info("Starting vLLM EKS integration tests...") self.deploy_vllm_service() self.test_vllm_api() + logger.info("All vLLM EKS tests completed successfully") return True except Exception as e: logger.error(f"Test execution failed: {e}") @@ -23,7 +26,20 @@ def run_tests(self): def deploy_vllm_service(self): logger.info("Deploying vLLM service...") - # first, wait until the AWS Load Balancer Controller is running + self._wait_for_load_balancer_controller() + + logger.info("Applying vLLM LeaderWorkerSet configuration...") + run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml") + + logger.info("Applying vLLM ingress configuration...") + run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml") + + self._wait_for_vllm_pods() + + logger.info("vLLM service deployed successfully") + + + def _wait_for_load_balancer_controller(self): logger.info("Waiting for AWS Load Balancer Controller to be ready...") max_retries = 20 # 10 minutes total retry_count = 0 @@ -31,19 +47,16 @@ def deploy_vllm_service(self): while retry_count < max_retries: result = run("kubectl get pods -n kube-system | grep aws-load-balancer-controller", warn=True) if "aws-load-balancer-controller" in result.stdout: - # count total and running ALB controller pods all_alb_pods = [ line for line in result.stdout.split("\n") if "aws-load-balancer-controller" in line and line.strip() ] running_alb_pods = [ - line for line in all_alb_pods - if "Running" in line + line for line in all_alb_pods if "Running" in line ] if all_alb_pods and len(running_alb_pods) == len(all_alb_pods): logger.info(f"All {len(running_alb_pods)} AWS Load Balancer Controller pods are running") - logger.info("AWS Load Balancer Controller is ready") - break + return else: logger.info(f"ALB controller pods: {len(running_alb_pods)}/{len(all_alb_pods)} running") @@ -51,15 +64,10 @@ def deploy_vllm_service(self): logger.info(f"ALB controller not ready yet, waiting... (attempt {retry_count}/{max_retries})") time.sleep(30) - if retry_count >= max_retries: - raise Exception("AWS Load Balancer Controller pods failed to start after 10 minutes") + raise Exception("AWS Load Balancer Controller pods failed to start after 10 minutes") + - # apply the LeaderWorkerSet - run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws.yaml") - # apply the ingress - run("kubectl apply -f test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml") - - # monitor pod status until Running (can take 15-30 minutes for large GPU images + model loading) + def _wait_for_vllm_pods(self): logger.info("Waiting for vLLM pods to reach Running status...") logger.info("This may take 15-30 minutes for container image pull and model loading") @@ -69,19 +77,16 @@ def deploy_vllm_service(self): while retry_count < max_retries: result = run("kubectl get pods -l app=vllm-deepseek-32b-lws", warn=True) if "vllm-deepseek-32b-lws" in result.stdout: - # count total and running vLLM pods all_vllm_pods = [ line for line in result.stdout.split("\n") if "vllm-deepseek-32b-lws" in line and line.strip() and "NAME" not in line ] running_vllm_pods = [ - line for line in all_vllm_pods - if "Running" in line + line for line in all_vllm_pods if "Running" in line ] if all_vllm_pods and len(running_vllm_pods) == len(all_vllm_pods): logger.info(f"All {len(running_vllm_pods)} vLLM pods are running") - logger.info("vLLM service is ready") - break + return else: statuses = [] for line in all_vllm_pods: @@ -96,21 +101,25 @@ def deploy_vllm_service(self): logger.info(f"vLLM pods not ready yet, waiting... (attempt {retry_count}/{max_retries})") time.sleep(30) - if retry_count >= max_retries: - raise Exception("vLLM pods failed to reach Running status after 30 minutes") - - logger.info("vLLM service deployed successfully") + raise Exception("vLLM pods failed to reach Running status after 30 minutes") def test_vllm_api(self): logger.info("Testing vLLM API...") - endpoint = run( "kubectl get ingress vllm-deepseek-32b-lws-ingress -o jsonpath='{.status.loadBalancer.ingress[0].hostname}'" ).stdout.strip() logger.info(f"vLLM API endpoint: {endpoint}") - # Test 1: completions API + if not endpoint: + raise Exception("Failed to get vLLM API endpoint from ingress") + + self._test_completions_api(endpoint) + self._test_chat_completions_api(endpoint) + logger.info("All vLLM API tests passed successfully") + + + def _test_completions_api(self, endpoint): logger.info("Testing completions API...") result = run( f"""curl -X POST http://{endpoint}/v1/completions \ @@ -120,8 +129,9 @@ def test_vllm_api(self): ) assert '"object":"text_completion"' in result.stdout, "vLLM completions API test failed" logger.info("Completions API test passed") - - # Test 2: chat completions API + + + def _test_chat_completions_api(self, endpoint): logger.info("Testing chat completions API...") result = run( f"""curl -X POST http://{endpoint}/v1/chat/completions \ @@ -130,6 +140,4 @@ def test_vllm_api(self): """ ) assert '"object":"chat.completion"' in result.stdout, "vLLM chat completions API test failed" - logger.info("Chat completions API test passed") - - logger.info("All vLLM API tests passed successfully") + logger.info("Chat completions API test passed") \ No newline at end of file From 2058e93d0b79d10abf36d7d30e60b49ca20ff760 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 21:16:05 -0700 Subject: [PATCH 14/16] refactor fsx utils to use boto3 --- test/vllm_tests/infra/utils/fsx_utils.py | 130 +++++++++++------------ 1 file changed, 63 insertions(+), 67 deletions(-) diff --git a/test/vllm_tests/infra/utils/fsx_utils.py b/test/vllm_tests/infra/utils/fsx_utils.py index d69e3e103af6..a92551d83a4b 100644 --- a/test/vllm_tests/infra/utils/fsx_utils.py +++ b/test/vllm_tests/infra/utils/fsx_utils.py @@ -1,5 +1,6 @@ import logging import time +import boto3 from invoke import run from typing import Dict, List, Any @@ -14,6 +15,8 @@ class FsxSetup: """ def __init__(self, region: str = "us-west-2"): self.region = region + self.fsx_client = boto3.client('fsx', region_name=region) + self.ec2_client = boto3.client('ec2', region_name=region) def create_fsx_filesystem( self, @@ -24,7 +27,7 @@ def create_fsx_filesystem( tags: Dict[str, str], ): """ - Create FSx filesystem with given configuration + Create FSx Lustre filesystem with given configuration : param subnet_id: subnet ID where FSx will be created : param security_group_ids: list of security group IDs : param storage_capacity: storage capacity in GiB @@ -32,31 +35,25 @@ def create_fsx_filesystem( : param tags: dictionary of tags to apply to the FSx filesystem : return: dictionary containing filesystem details """ - tags_param = " ".join([f"Key={k},Value={v}" for k, v in tags.items()]) - try: - fsx_id = run( - f'aws fsx create-file-system' - f' --file-system-type LUSTRE' - f' --storage-capacity {storage_capacity}' - f' --subnet-ids {subnet_id}' - f' --security-group-ids {" ".join(security_group_ids)}' - f' --lustre-configuration DeploymentType={deployment_type}' - f' --tags {tags_param}' - f' --query "FileSystem.FileSystemId"' - f' --output text' - ).stdout.strip() - - logger.info(f"Created FSx filesystem: {fsx_id}") + response = self.fsx_client.create_file_system( + FileSystemType='LUSTRE', + StorageCapacity=storage_capacity, + SubnetIds=[subnet_id], + SecurityGroupIds=security_group_ids, + LustreConfiguration={'DeploymentType': deployment_type}, + Tags=[{'Key': k, 'Value': v} for k, v in tags.items()] + ) + + filesystem_id = response['FileSystem']['FileSystemId'] + logger.info(f"Created FSx filesystem: {filesystem_id}") - filesystem_info = self.wait_for_filesystem(fsx_id) - return filesystem_info + return self.wait_for_filesystem(filesystem_id) except Exception as e: logger.error(f"Failed to create FSx filesystem: {e}") raise - def wait_for_filesystem(self, filesystem_id: str): """ Wait for FSx filesystem to become available and return its details @@ -65,36 +62,29 @@ def wait_for_filesystem(self, filesystem_id: str): : raises: Exception if filesystem enters FAILED, DELETING, or DELETED state """ logger.info(f"Waiting for FSx filesystem {filesystem_id} to be available...") - while True: - status = run( - f"aws fsx describe-file-systems --file-system-id {filesystem_id} " - f"--query 'FileSystems[0].Lifecycle' --output text" - ).stdout.strip() - - if status == "AVAILABLE": - break - elif status in ["FAILED", "DELETING", "DELETED"]: - raise Exception(f"FSx filesystem entered {status} state") - - logger.info(f"FSx status: {status}, waiting...") - time.sleep(30) - - # get fs DNS and mount name - fsx_dns = run( - f"aws fsx describe-file-systems --file-system-id {filesystem_id} " - f"--query 'FileSystems[0].DNSName' --output text" - ).stdout.strip() - - fsx_mount = run( - f"aws fsx describe-file-systems --file-system-id {filesystem_id} " - f"--query 'FileSystems[0].LustreConfiguration.MountName' --output text" - ).stdout.strip() - - return { - 'filesystem_id': filesystem_id, - 'dns_name': fsx_dns, - 'mount_name': fsx_mount - } + + try: + waiter = self.fsx_client.get_waiter('file_system_available') + waiter.wait( + FileSystemIds=[filesystem_id], + WaiterConfig={'Delay': 30, 'MaxAttempts': 60} + ) + + # Get filesystem details + response = self.fsx_client.describe_file_systems( + FileSystemIds=[filesystem_id] + ) + filesystem = response['FileSystems'][0] + + return { + 'filesystem_id': filesystem_id, + 'dns_name': filesystem['DNSName'], + 'mount_name': filesystem['LustreConfiguration']['MountName'] + } + + except Exception as e: + logger.error(f"Error waiting for filesystem {filesystem_id}: {e}") + raise def create_security_group( self, @@ -111,14 +101,12 @@ def create_security_group( : raises: Exception if security group creation fails """ try: - sg_id = run( - f'aws ec2 create-security-group' - f' --group-name {name}' - f' --description "{description}"' - f' --vpc-id {vpc_id}' - f' --query "GroupId"' - f' --output text' - ).stdout.strip() + response = self.ec2_client.create_security_group( + GroupName=name, + Description=description, + VpcId=vpc_id + ) + sg_id = response['GroupId'] logger.info(f"Created security group: {sg_id}") return sg_id @@ -126,7 +114,6 @@ def create_security_group( logger.error(f"Failed to create security group: {e}") raise - def add_security_group_ingress_rules( self, security_group_id: str, @@ -141,19 +128,30 @@ def add_security_group_ingress_rules( : raises: Exception if adding ingress rules fails """ try: + ip_permissions = [] for rule in ingress_rules: - cmd = f"aws ec2 authorize-security-group-ingress --group-id {security_group_id}" - for key, value in rule.items(): - cmd += f" --{key} {value}" - run(cmd) - + from_port, to_port = map(int, rule['port'].split('-')) + permission = { + 'IpProtocol': rule['protocol'], + 'FromPort': from_port, + 'ToPort': to_port, + 'UserIdGroupPairs': [{ + 'GroupId': rule['source-group'] + }] + } + ip_permissions.append(permission) + + self.ec2_client.authorize_security_group_ingress( + GroupId=security_group_id, + IpPermissions=ip_permissions + ) + logger.info(f"Added ingress rules to security group: {security_group_id}") except Exception as e: logger.error(f"Failed to add ingress rules to security group: {e}") raise - def setup_csi_driver(self): """ Install and configure the AWS FSx CSI Driver in the Kubernetes cluster @@ -173,7 +171,6 @@ def setup_csi_driver(self): logger.error(f"Failed to setup FSx CSI driver: {e}") raise - def _verify_csi_driver(self): """ Verify that FSx CSI driver pods are running correctly in the cluster @@ -195,7 +192,6 @@ def _verify_csi_driver(self): logger.info(f"Found {len(fsx_pods)} running FSx CSI driver pods") - def setup_kubernetes_resources( self, storage_class_file: str, @@ -251,4 +247,4 @@ def validate_kubernetes_resources(self): except Exception as e: logger.error(f"FSx resource validation failed: {e}") - raise + raise \ No newline at end of file From d028e3d41d1ed3f5d59a88172543241901acdb35 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Thu, 10 Jul 2025 21:18:08 -0700 Subject: [PATCH 15/16] remove region flags --- test/vllm_tests/infra/eks_infra.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index 6c6449bef6af..a47cbda81fd5 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -101,9 +101,9 @@ def validate_aws_credentials(self): def create_eks_cluster(self): logger.info("Creating EKS cluster...") - run(f"eksctl create cluster -f test/vllm_tests/test_artifacts/eks-cluster.yaml --region {self.region}") + run(f"eksctl create cluster -f test/vllm_tests/test_artifacts/eks-cluster.yaml") - run(f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml --region {self.region}") + run(f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml") eks_utils.eks_write_kubeconfig(self.cluster_name, self.region) From 9f185e3a0cc132ffc5c4d07fa9df2317d528df21 Mon Sep 17 00:00:00 2001 From: Jinyan Li Date: Fri, 11 Jul 2025 13:21:19 -0700 Subject: [PATCH 16/16] update IAM identity setup --- test/vllm_tests/infra/eks_infra.py | 78 ++++++++++++++---------------- 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/test/vllm_tests/infra/eks_infra.py b/test/vllm_tests/infra/eks_infra.py index a47cbda81fd5..31ec14b54baa 100644 --- a/test/vllm_tests/infra/eks_infra.py +++ b/test/vllm_tests/infra/eks_infra.py @@ -12,12 +12,12 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) + class EksInfrastructure: def __init__(self): self.cluster_name = "vllm-cluster" self.region = os.getenv("AWS_REGION", "us-west-2") - def setup_infrastructure(self): try: logger.info("Starting EKS infrastructure setup...") @@ -33,22 +33,22 @@ def setup_infrastructure(self): self.cleanup_infrastructure() return False - def setup_eks_tools(self): logger.info("Setting up EKS tools...") eks_utils.eks_setup() self.install_helm() logger.info("EKS tools setup completed") - def install_helm(self): logger.info("Installing Helm...") result = run("which helm", warn=True) if result.return_code == 0: logger.info("Helm already installed") return - - run("curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3") + + run( + "curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3" + ) run("chmod 700 get_helm.sh") run("./get_helm.sh") run("rm -f get_helm.sh") @@ -58,7 +58,6 @@ def install_helm(self): raise Exception("Helm installation failed - helm not found in PATH") logger.info("Helm installed successfully") - def validate_required_tools(self): logger.info("Validating required tools...") @@ -80,24 +79,6 @@ def validate_required_tools(self): else: logger.info("All required tools are available") - - def validate_aws_credentials(self): - logger.info("Validating AWS credentials...") - try: - sts_client = boto3.client("sts") - identity = sts_client.get_caller_identity() - logger.info(f"AWS Identity validated: {identity['Arn']}") - - if not eks_utils.get_eks_role(): - os.environ["EKS_TEST_ROLE"] = identity["Arn"] - logger.info(f"Set EKS_TEST_ROLE: {identity['Arn']}") - - return True - except Exception as e: - logger.error(f"AWS credential validation failed: {e}") - return False - - def create_eks_cluster(self): logger.info("Creating EKS cluster...") @@ -106,12 +87,12 @@ def create_eks_cluster(self): run(f"eksctl create nodegroup -f test/vllm_tests/test_artifacts/large-model-nodegroup.yaml") eks_utils.eks_write_kubeconfig(self.cluster_name, self.region) + self.setup_iam_identity() result = run("kubectl get nodes") assert "Ready" in result.stdout, "EKS nodes not ready" logger.info("EKS cluster created successfully") - def validate_cluster_setup(self): logger.info("Validating cluster setup...") @@ -155,7 +136,6 @@ def validate_cluster_setup(self): logger.info("Cluster setup validation completed") - def setup_fsx_lustre(self): try: logger.info("Setting up FSx Lustre filesystem...") @@ -179,17 +159,15 @@ def setup_fsx_lustre(self): logger.info(f"Using cluster security group: {cluster_sg_id}") sg_id = fsx.create_security_group( - vpc_id=vpc_id, - name="fsx-lustre-sg", - description="Security group for FSx Lustre" + vpc_id=vpc_id, name="fsx-lustre-sg", description="Security group for FSx Lustre" ) fsx.add_security_group_ingress_rules( security_group_id=sg_id, ingress_rules=[ {"protocol": "tcp", "port": "988-1023", "source-group": cluster_sg_id}, - {"protocol": "tcp", "port": "988-1023", "source-group": sg_id} - ] + {"protocol": "tcp", "port": "988-1023", "source-group": sg_id}, + ], ) fs_info = fsx.create_fsx_filesystem( @@ -197,7 +175,7 @@ def setup_fsx_lustre(self): security_group_ids=[sg_id], storage_capacity=1200, deployment_type="SCRATCH_2", - tags={"Name": "vllm-model-storage"} + tags={"Name": "vllm-model-storage"}, ) fsx.setup_csi_driver() @@ -211,22 +189,23 @@ def setup_fsx_lustre(self): "": sg_id, "": fs_info["filesystem_id"], ".fsx.us-west-2.amazonaws.com": fs_info["dns_name"], - "": fs_info["mount_name"] - } + "": fs_info["mount_name"], + }, ) logger.info("FSx Lustre setup completed successfully") - + except Exception as e: logger.error(f"FSx Lustre setup failed: {e}") raise - def setup_load_balancer_controller(self): logger.info("Setting up AWS Load Balancer Controller...") run("helm repo add eks https://aws.github.io/eks-charts") run("helm repo update") - run("kubectl apply -f https://raw.githubusercontent.com/aws/eks-charts/master/stable/aws-load-balancer-controller/crds/crds.yaml") + run( + "kubectl apply -f https://raw.githubusercontent.com/aws/eks-charts/master/stable/aws-load-balancer-controller/crds/crds.yaml" + ) run( f"helm install aws-load-balancer-controller eks/aws-load-balancer-controller -n kube-system --set clusterName={self.cluster_name} --set serviceAccount.create=false --set enableServiceMutatorWebhook=false" ) @@ -266,10 +245,10 @@ def setup_load_balancer_controller(self): run( f"sed -i 's||{alb_sg}|g' test/vllm_tests/test_artifacts/vllm-deepseek-32b-lws-ingress.yaml" ) - + # verify sg were created and configured correctly logger.info("Verifying security group configurations...") - + # verify ALB sg alb_sg_result = run( f'aws ec2 describe-security-groups --group-ids {alb_sg} --query "SecurityGroups[0].IpPermissions"' @@ -277,19 +256,18 @@ def setup_load_balancer_controller(self): if "80" not in alb_sg_result.stdout: raise Exception("ALB security group not configured correctly - missing port 80 rule") logger.info("ALB security group configured correctly") - + # verify node sg rules node_sg_result = run( f'aws ec2 describe-security-groups --group-ids {node_sg} --query "SecurityGroups[0].IpPermissions"' ) if "8000" not in node_sg_result.stdout: raise Exception("Node security group not configured correctly - missing port 8000 rule") - + logger.info("Node security group configured correctly") logger.info("Load Balancer Controller setup and verification completed") - def cleanup_resources(self): logger.info("Running cleanup script...") try: @@ -300,6 +278,22 @@ def cleanup_resources(self): except Exception as e: logger.error(f"Cleanup failed: {e}") + def setup_iam_identity(self): + logger.info("Setting up IAM identity mapping...") + + try: + sts_client = boto3.client("sts") + identity = sts_client.get_caller_identity() + codebuild_role_arn = identity["Arn"] + + os.environ["EKS_TEST_ROLE"] = codebuild_role_arn + os.environ["AWS_REGION"] = self.region + + run(f"bash eks_infrastructure/add_iam_identity.sh {self.cluster_name}") + logger.info("IAM identity mapping completed successfully") + except Exception as e: + logger.error(f"Failed to setup IAM identity mapping: {e}") + raise def cleanup_infrastructure(self): try: