diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py index 83c6d24e..21020b51 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py @@ -287,7 +287,15 @@ def apply_url_override(url: str) -> str: else: # Local executor - use localhost endpoint_uri = cfg.deployment.endpoints[endpoint_type] - endpoint_url = f"http://127.0.0.1:{cfg.deployment.port}{endpoint_uri}" + + # Use HAProxy port if multiple_instances is enabled + if cfg.deployment.get("multiple_instances", False): + proxy_config = cfg.execution.get("proxy", {}).get("config", {}) + port = proxy_config.get("haproxy_port", 5009) + else: + port = cfg.deployment.port + + endpoint_url = f"http://127.0.0.1:{port}{endpoint_uri}" return endpoint_url diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml index 4938a836..62609fcd 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml @@ -25,6 +25,11 @@ ntasks_per_node: 1 gres: gpu:8 walltime: 01:00:00 subproject: nemo-evaluator-launcher + +# Deployment-specific SLURM configuration +deployment: + n_tasks: 1 # Number of tasks for deployment srun (default: 1, for multi-instance set to num_nodes) + env_vars: deployment: {} evaluation: {} @@ -32,3 +37,11 @@ mounts: deployment: {} evaluation: {} mount_home: true + +proxy: + type: haproxy + image: haproxy:latest + config: + haproxy_port: 5009 + health_check_path: /health + health_check_status: 200 diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index 2bb5411b..7b53f0d5 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -30,6 +30,7 @@ from typing import Dict, List, Optional import yaml +from jinja2 import Environment, FileSystemLoader from omegaconf import DictConfig, OmegaConf from nemo_evaluator_launcher.common.execdb import ( @@ -41,11 +42,8 @@ from nemo_evaluator_launcher.common.helpers import ( CmdAndReadableComment, get_api_key_name, - get_endpoint_url, get_eval_factory_command, - get_eval_factory_config, get_eval_factory_dataset_size_from_run_config, - get_health_url, get_timestamp_string, ) from nemo_evaluator_launcher.common.logging_utils import logger @@ -126,6 +124,25 @@ def execute_eval(cfg: DictConfig, dry_run: bool = False) -> str: invocation_id=invocation_id, job_id=job_id, ) + + # Create proxy config file with placeholder IPs for multi-instance deployments + if cfg.deployment.get("multiple_instances", False): + proxy_type = cfg.execution.get("proxy", {}).get("type", "haproxy") + if proxy_type == "haproxy": + proxy_config = _generate_haproxy_config_with_placeholders(cfg) + else: + raise ValueError( + f"Unsupported proxy type: {proxy_type}. Currently only 'haproxy' is supported." + ) + + # Save both template and working config + proxy_template_path = local_task_subdir / "proxy.cfg.template" + proxy_config_path = local_task_subdir / "proxy.cfg" + with open(proxy_template_path, "w") as f: + f.write(proxy_config) + with open(proxy_config_path, "w") as f: + f.write(proxy_config) + sbatch_script_content_str = sbatch_script_content_struct.cmd # We accumulate if any task contains unsafe commands @@ -491,15 +508,6 @@ def _create_slurm_sbatch_script( tasks_mapping = load_tasks_mapping() task_definition = get_task_from_mapping(task.name, tasks_mapping) - # Create merged config for get_endpoint_url - merged_nemo_evaluator_config = get_eval_factory_config(cfg, task) - health_url = get_health_url( - cfg, - get_endpoint_url( - cfg, merged_nemo_evaluator_config, task_definition["endpoint_type"] - ), - ) - # TODO(public release): convert to template s = "#!/bin/bash\n" @@ -596,36 +604,30 @@ def _create_slurm_sbatch_script( deployment_mounts_list.append(f"{source_mnt}:{target_mnt}") # add deployment srun command - s += "# deployment server\n" - s += "srun --mpi pmix --overlap " - s += "--container-image {} ".format(cfg.deployment.image) - if deployment_mounts_list: - s += "--container-mounts {} ".format(",".join(deployment_mounts_list)) - if not cfg.execution.get("mounts", {}).get("mount_home", True): - s += "--no-container-mount-home " - s += "--output {} ".format(remote_task_subdir / "logs" / "server-%A.out") - deployment_env_var_names = list( - cfg.execution.get("env_vars", {}).get("deployment", {}) - ) - if cfg.deployment.get("env_vars"): - warnings.warn( - "cfg.deployment.env_vars will be deprecated in future versions. " - "Use cfg.execution.env_vars.deployment instead.", - category=DeprecationWarning, - stacklevel=2, - ) - deployment_env_var_names.extend(list(cfg.deployment["env_vars"])) - if deployment_env_var_names: - s += f"--container-env {','.join(deployment_env_var_names)} " - s += "{} &\n\n".format(cfg.deployment.command) # run asynchronously - s += ( - "SERVER_PID=$! # capture the PID of the server background srun process\n\n" + s += _generate_deployment_srun_command( + cfg, deployment_mounts_list, remote_task_subdir ) # wait for the server to initialize - s += _WAIT_FOR_SERVER_HANDLER.format(health_url=health_url) + health_path = cfg.deployment.get("health_check_path", "/health") + # For multi-instance check all node IPs, for single instance check localhost + if cfg.deployment.get("multiple_instances", False): + ip_list = '"${NODES_IPS_ARRAY[@]}"' + else: + ip_list = '"127.0.0.1"' + s += _get_wait_for_server_handler( + ip_list, + cfg.deployment.port, + health_path, + "server", + check_pid=True, + ) s += "\n\n" + # add proxy load balancer for multi-instance deployments + if cfg.deployment.get("multiple_instances", False): + s += _get_proxy_server_srun_command(cfg, remote_task_subdir) + # prepare evaluation mounts evaluation_mounts_list = [ "{}:/results".format(remote_task_subdir / "artifacts"), @@ -655,6 +657,7 @@ def _create_slurm_sbatch_script( s += "# evaluation client\n" s += "srun --mpi pmix --overlap " + s += "--nodes 1 --ntasks 1 " # Client always runs on single node s += "--container-image {} ".format(eval_image) evaluation_env_var_names = list( cfg.execution.get("env_vars", {}).get("evaluation", {}) @@ -672,7 +675,10 @@ def _create_slurm_sbatch_script( # terminate the server after all evaluation clients finish if cfg.deployment.type != "none": - s += "kill $SERVER_PID # terminate the server to finish gracefully\n\n" + s += "kill $SERVER_PID # terminate the server to finish gracefully\n" + if cfg.deployment.get("multiple_instances", False): + s += "kill $PROXY_PID # terminate proxy to finish gracefully\n" + s += "\n" # auto-export ae_cfg = cfg.execution.get("auto_export") @@ -1139,9 +1145,192 @@ def _get_progress( """.strip() -_WAIT_FOR_SERVER_HANDLER = """ -date -# wait for the server to initialize -bash -c 'while [[ "$(curl -s -o /dev/null -w "%{{http_code}}" {health_url})" != "200" ]]; do kill -0 '"$SERVER_PID"' 2>/dev/null || {{ echo "Server process '"$SERVER_PID"' died"; exit 1; }}; sleep 5; done' +def _generate_haproxy_config_with_placeholders(cfg): + """Generate HAProxy configuration with placeholder IPs using Jinja template.""" + # Set up Jinja environment + template_dir = Path(__file__).parent + template_path = template_dir / "proxy.cfg.template" + + if not template_path.exists(): + raise FileNotFoundError(f"Proxy template not found: {template_path}") + + env = Environment(loader=FileSystemLoader(template_dir)) + template = env.get_template("proxy.cfg.template") + + # Prepare template data with placeholder IPs - use actual number of nodes + num_nodes = cfg.execution.num_nodes + nodes = [] + for i in range(num_nodes): + nodes.append({"ip": f"{{IP_{i}}}", "port": cfg.deployment.port}) + + # Get health check parameters from execution config + proxy_config = cfg.execution.get("proxy", {}).get("config", {}) + health_check_path = proxy_config.get("health_check_path", "/health") + health_check_status = proxy_config.get("health_check_status", 200) + haproxy_port = proxy_config.get("haproxy_port", 5009) + + # Render template + config = template.render( + haproxy_port=haproxy_port, + health_check_path=health_check_path, + health_check_status=health_check_status, + nodes=nodes, + ) + + return config + + +def _generate_haproxy_config(cfg, nodes_ips): + """Generate HAProxy configuration using Jinja template.""" + # Set up Jinja environment + template_dir = Path(__file__).parent + template_path = template_dir / "proxy.cfg.template" + + if not template_path.exists(): + raise FileNotFoundError(f"Proxy template not found: {template_path}") + + env = Environment(loader=FileSystemLoader(template_dir)) + template = env.get_template("proxy.cfg.template") + + # Prepare template data + nodes = [] + for i, ip in enumerate(nodes_ips, 1): + nodes.append( + {"ip": ip, "port": cfg.deployment.port} # All nodes use the same port + ) + + # Get health check parameters from deployment config + health_check_path = cfg.deployment.get("health_check_path", "/health") + health_check_status = cfg.deployment.get("health_check_status", 200) + haproxy_port = cfg.deployment.get("haproxy_port", 5009) + + # Render template + config = template.render( + haproxy_port=haproxy_port, + health_check_path=health_check_path, + health_check_status=health_check_status, + nodes=nodes, + ) + + return config + + +def _generate_deployment_srun_command( + cfg, deployment_mounts_list, remote_task_subdir, instance_id: int = 0 +): + """Generate the deployment srun command with proper node/ntask configuration.""" + s = "" + s += "# deployment server\n" + s += "# Get node IPs\n" + s += "nodes=( $(scontrol show hostnames $SLURM_JOB_NODELIST) )\n" + s += 'nodes_array=("${nodes[@]}") # Ensure nodes are stored properly\n' + s += 'export NODES_IPS_ARRAY=($(for node in "${nodes_array[@]}"; do srun --nodelist=$node --ntasks=1 --nodes=1 hostname --ip-address; done))\n' + s += 'echo "Node IPs: ${NODES_IPS_ARRAY[@]}"\n' + s += "# Export MASTER_IP as the first node IP\n" + s += "export MASTER_IP=${NODES_IPS_ARRAY[0]}\n" + s += 'echo "MASTER_IP: $MASTER_IP"\n' + s += "srun --mpi pmix --overlap " + s += f"--nodes {cfg.execution.num_nodes} --ntasks {cfg.execution.get('deployment', {}).get('n_tasks', 1)} " + s += "--container-image {} ".format(cfg.deployment.image) + if deployment_mounts_list: + s += "--container-mounts {} ".format(",".join(deployment_mounts_list)) + if not cfg.execution.get("mounts", {}).get("mount_home", True): + s += "--no-container-mount-home " + s += "--output {} ".format(remote_task_subdir / "logs" / "server-%A-%t.out") + + deployment_env_var_names = list( + cfg.execution.get("env_vars", {}).get("deployment", {}) + ) + if cfg.deployment.get("env_vars"): + warnings.warn( + "cfg.deployment.env_vars will be deprecated in future versions. " + "Use cfg.execution.env_vars.deployment instead.", + category=DeprecationWarning, + stacklevel=2, + ) + deployment_env_var_names.extend(list(cfg.deployment["env_vars"])) + + # Always add MASTER_IP to the environment variables + if "MASTER_IP" not in deployment_env_var_names: + deployment_env_var_names.append("MASTER_IP") + + if deployment_env_var_names: + s += f"--container-env {','.join(deployment_env_var_names)} " + s += "{} &\n\n".format(cfg.deployment.command) # run asynchronously + s += "SERVER_PID=$! # capture the PID of the server background srun process\n\n" + + return s + + +def _get_wait_for_server_handler( + ip_list: str, + port: int, + health_check_path: str, + service_name: str = "server", + check_pid: bool = False, +): + """Generate wait for server handler that takes a list of IPs.""" + pid_check = "" + if check_pid: + pid_check = 'kill -0 "$SERVER_PID" 2>/dev/null || { echo "Server process $SERVER_PID died"; exit 1; }' + + handler = f"""date +# wait for the {service_name} to initialize +for ip in {ip_list}; do + echo "Waiting for {service_name} on $ip..." + while [[ "$(curl -s -o /dev/null -w "%{{http_code}}" http://$ip:{port}{health_check_path})" != "200" ]]; do + {pid_check} + sleep 5 + done + echo "{service_name} ready on $ip!" +done date """.strip() + + return handler + + +def _get_proxy_server_srun_command(cfg, remote_task_subdir): + """Generate proxy server srun command based on proxy type.""" + proxy_type = cfg.execution.get("proxy", {}).get("type", "haproxy") + + if proxy_type == "haproxy": + return _generate_haproxy_srun_command(cfg, remote_task_subdir) + else: + raise ValueError( + f"Unsupported proxy type: {proxy_type}. Currently only 'haproxy' is supported." + ) + + +def _generate_haproxy_srun_command(cfg, remote_task_subdir): + """Generate HAProxy-specific srun command using template-based config.""" + s = "" + s += "# Proxy load balancer\n" + s += "# Copy template to config file (important for restarts)\n" + s += f"cp {remote_task_subdir}/proxy.cfg.template {remote_task_subdir}/proxy.cfg\n" + s += "# Replace placeholder IPs with actual node IPs\n" + s += f"proxy_config_file={remote_task_subdir}/proxy.cfg\n" + s += 'for i in "${!NODES_IPS_ARRAY[@]}"; do\n' + s += ' ip="${NODES_IPS_ARRAY[$i]}"\n' + s += ' sed -i "s/{IP_$i}/$ip/g" "$proxy_config_file"\n' + s += "done\n" + s += "\n" + s += "srun --mpi pmix --overlap " + s += "--nodes 1 --ntasks 1 " + s += f"--container-image {cfg.execution.get('proxy', {}).get('image', 'haproxy:latest')} " + s += f"--container-mounts {remote_task_subdir}/proxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro " + s += f"--output {remote_task_subdir}/logs/proxy-%A.out " + s += "haproxy -f /usr/local/etc/haproxy/haproxy.cfg &\n" + s += "PROXY_PID=$! # capture the PID of the proxy background srun process\n" + s += 'echo "Proxy started with PID: $PROXY_PID"\n\n' + + # Wait for proxy to be ready on localhost + proxy_config = cfg.execution.get("proxy", {}).get("config", {}) + haproxy_port = proxy_config.get("haproxy_port", 5009) + health_path = proxy_config.get("health_check_path", "/health") + s += _get_wait_for_server_handler( + "127.0.0.1", haproxy_port, health_path, "Proxy", check_pid=False + ) + s += "\n" + + return s diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/proxy.cfg.template b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/proxy.cfg.template new file mode 100644 index 00000000..e32f4fc9 --- /dev/null +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/proxy.cfg.template @@ -0,0 +1,26 @@ +global + log stdout format raw local0 + maxconn 4096 + +defaults + log global + mode http + option httplog + timeout connect 10s + timeout client 100000s + timeout server 100000s + +frontend service_frontend + bind *:{{ haproxy_port }} + default_backend service_backend + +backend service_backend + mode http + option httpchk GET {{ health_check_path }} + http-check expect status {{ health_check_status }} + option http-server-close + balance leastconn +{% for node in nodes %} + server node{{ loop.index }} {{ node.ip }}:{{ node.port }} check +{% endfor %} + diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index b46748ae..3fa18c1e 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -45,6 +45,10 @@ def base_config(self): "image": "test-image:latest", "command": "test-command", "served_model_name": "test-model", + "port": 8000, + "endpoints": { + "health": "/health", + }, }, "execution": { "type": "slurm", @@ -83,12 +87,6 @@ def mock_dependencies(self): patch( "nemo_evaluator_launcher.executors.slurm.executor.get_task_from_mapping" ) as mock_get_task, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch( "nemo_evaluator_launcher.common.helpers.get_eval_factory_command" ) as mock_get_eval_command, @@ -103,8 +101,6 @@ def mock_dependencies(self): "endpoint_type": "openai", "task": "test_task", } - mock_get_health.return_value = "http://localhost:8000/health" - mock_get_endpoint.return_value = "http://localhost:8000/v1" from nemo_evaluator_launcher.common.helpers import CmdAndReadableComment mock_get_eval_command.return_value = CmdAndReadableComment( @@ -115,8 +111,6 @@ def mock_dependencies(self): yield { "load_tasks_mapping": mock_load_tasks, "get_task_from_mapping": mock_get_task, - "get_health_url": mock_get_health, - "get_endpoint_url": mock_get_endpoint, "get_eval_factory_command": mock_get_eval_command, "get_served_model_name": mock_get_model_name, } @@ -500,6 +494,223 @@ def test_complex_configuration_integration( # mount_home=False should add --no-container-mount-home assert "--no-container-mount-home" in script + @pytest.mark.parametrize( + "num_nodes,n_tasks,expected_ntasks,should_have_proxy", + [ + (1, 1, 1, False), # Single instance, no proxy + (4, 4, 4, True), # Multi-instance with matching n_tasks, needs proxy + (2, 1, 1, False), # Multiple nodes but single task, no proxy + (3, 3, 3, True), # Multi-instance with 3 nodes, needs proxy + ], + ) + def test_deployment_n_tasks_and_proxy_setup( + self, + base_config, + mock_task, + mock_dependencies, + num_nodes, + n_tasks, + expected_ntasks, + should_have_proxy, + ): + """Test deployment.n_tasks with various configurations and proxy setup.""" + base_config["execution"]["deployment"] = {"n_tasks": n_tasks} + base_config["execution"]["num_nodes"] = num_nodes + # Set multiple_instances to trigger proxy setup when needed + base_config["deployment"]["multiple_instances"] = should_have_proxy + + cfg = OmegaConf.create(base_config) + + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Check that deployment srun uses correct --ntasks value + assert f"--nodes {num_nodes} --ntasks {expected_ntasks}" in script + + # Check proxy setup based on multi-instance or not + if should_have_proxy: + assert "proxy" in script.lower() + else: + assert "proxy" not in script.lower() + + def test_deployment_n_tasks_default_value( + self, base_config, mock_task, mock_dependencies + ): + """Test deployment.n_tasks defaults to 1 when not specified.""" + # Don't set deployment.n_tasks - should default to 1 + base_config["execution"]["num_nodes"] = 2 + + cfg = OmegaConf.create(base_config) + + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Check that deployment srun defaults to --ntasks 1 + assert "--nodes 2 --ntasks 1" in script + + # Check that no proxy is set up (since n_tasks=1, even though num_nodes=2) + assert "proxy" not in script.lower() + + +class TestSlurmExecutorHelperFunctions: + """Test individual helper functions used by SLURM executor.""" + + @pytest.mark.parametrize( + "num_nodes,n_tasks,has_mounts,mount_home,expected_nodes,expected_ntasks,expected_mount_home_flag", + [ + (1, 1, False, True, 1, 1, False), # Single node, no mounts, mount home + (4, 4, False, True, 4, 4, False), # Multi-node, no mounts, mount home + (2, 1, True, True, 2, 1, False), # Multi-node single task with mounts + (1, 1, False, False, 1, 1, True), # Single node, no mount home + (3, 3, True, False, 3, 3, True), # Multi-node with mounts, no mount home + ], + ) + def test_generate_deployment_srun_command( + self, + num_nodes, + n_tasks, + has_mounts, + mount_home, + expected_nodes, + expected_ntasks, + expected_mount_home_flag, + ): + """Test _generate_deployment_srun_command with various configurations.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + # Create config + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "python -m vllm.entrypoints.openai.api_server --model /model", + }, + "execution": { + "num_nodes": num_nodes, + "deployment": {"n_tasks": n_tasks}, + "mounts": {"mount_home": mount_home}, + }, + } + cfg = OmegaConf.create(config) + + # Create mounts list + mounts_list = ["/host/path:/container/path"] if has_mounts else [] + + # Generate command + command = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=mounts_list, + remote_task_subdir=Path("/test/remote"), + ) + + # Verify nodes and ntasks + assert f"--nodes {expected_nodes} --ntasks {expected_ntasks}" in command + + # Verify image + assert "test-image:latest" in command + + # Verify mounts + if has_mounts: + assert "/host/path:/container/path" in command + + # Verify mount_home flag + if expected_mount_home_flag: + assert "--no-container-mount-home" in command + else: + assert "--no-container-mount-home" not in command + + # Verify node IP collection + assert "NODES_IPS_ARRAY" in command + assert "MASTER_IP" in command + + @pytest.mark.parametrize( + "ip_list,port,health_path,service_name,check_pid,expected_in_output", + [ + ( + '"127.0.0.1"', + 8000, + "/health", + "server", + True, + ["127.0.0.1", "8000", "/health", "server", "SERVER_PID"], + ), + ( + '"${NODES_IPS_ARRAY[@]}"', + 5009, + "/status", + "Proxy", + False, + ["NODES_IPS_ARRAY", "5009", "/status", "Proxy"], + ), + ( + '"10.0.0.1"', + 8080, + "/ready", + "service", + True, + ["10.0.0.1", "8080", "/ready", "service", "SERVER_PID"], + ), + ( + '"${NODES_IPS_ARRAY[@]}"', + 8000, + "/health", + "server", + True, + ["NODES_IPS_ARRAY", "8000", "/health", "SERVER_PID"], + ), + ], + ) + def test_get_wait_for_server_handler( + self, ip_list, port, health_path, service_name, check_pid, expected_in_output + ): + """Test _get_wait_for_server_handler with various configurations.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _get_wait_for_server_handler, + ) + + # Generate handler + handler = _get_wait_for_server_handler( + ip_list=ip_list, + port=port, + health_check_path=health_path, + service_name=service_name, + check_pid=check_pid, + ) + + # Verify all expected strings are in output + for expected in expected_in_output: + assert expected in handler + + # Verify PID check logic + if check_pid: + assert "SERVER_PID" in handler + assert "kill -0" in handler + else: + assert "kill -0" not in handler + + # Verify curl command structure + assert "curl -s -o /dev/null" in handler + assert f"http://$ip:{port}{health_path}" in handler + + # Verify loop structure + assert "for ip in" in handler + assert "while" in handler + assert "done" in handler + class TestSlurmExecutorDryRun: """Test SlurmExecutor dry run functionality.""" @@ -594,12 +805,6 @@ def test_execute_eval_dry_run_basic( patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("builtins.print") as mock_print, ): # Configure mocks @@ -619,8 +824,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --model llama-3.1-8b-instruct --task {task_name}", debug="# Test command for dry run", ) - mock_get_health.return_value = "http://localhost:8000/health" - mock_get_endpoint.return_value = "http://localhost:8000/v1" # Execute dry run invocation_id = SlurmExecutor.execute_eval(sample_config, dry_run=True) @@ -738,12 +941,6 @@ def test_execute_eval_dry_run_custom_container( patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("builtins.print"), ): mock_load_mapping.return_value = mock_tasks_mapping @@ -761,8 +958,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --task test_command", debug="# Test command for custom container", ) - mock_get_health.return_value = "http://localhost:8000/health" - mock_get_endpoint.return_value = "http://localhost:8000/v1" # Execute dry run invocation_id = SlurmExecutor.execute_eval(sample_config, dry_run=True) @@ -800,12 +995,6 @@ def test_execute_eval_dry_run_no_auto_export( patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("builtins.print"), ): mock_load_mapping.return_value = mock_tasks_mapping @@ -823,8 +1012,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --task test_command", debug="# Test command for no auto-export", ) - mock_get_health.return_value = "http://localhost:8000/health" - mock_get_endpoint.return_value = "http://localhost:8000/v1" # Should execute successfully without auto-export invocation_id = SlurmExecutor.execute_eval(sample_config, dry_run=True) @@ -1296,12 +1483,6 @@ def mock_subprocess_run(*args, **kwargs): patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("subprocess.run", side_effect=mock_subprocess_run), ): # Configure mocks @@ -1320,8 +1501,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --task mmlu_pro", debug="# Test command for mmlu_pro", ) - mock_get_health.return_value = "http://127.0.0.1:8000/health" - mock_get_endpoint.return_value = "http://127.0.0.1:8000/v1" # Execute non-dry-run invocation_id = SlurmExecutor.execute_eval(sample_config, dry_run=False) @@ -1392,12 +1571,6 @@ def mock_subprocess_run(*args, **kwargs): patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("subprocess.run", side_effect=mock_subprocess_run), ): # Configure mocks @@ -1416,8 +1589,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --task mmlu_pro", debug="# Test command for mmlu_pro SSH failure", ) - mock_get_health.return_value = "http://127.0.0.1:8000/health" - mock_get_endpoint.return_value = "http://127.0.0.1:8000/v1" # Should still succeed (SSH connection can be None) invocation_id = SlurmExecutor.execute_eval(sample_config, dry_run=False) @@ -1485,12 +1656,6 @@ def mock_subprocess_run(*args, **kwargs): patch( "nemo_evaluator_launcher.executors.slurm.executor.get_eval_factory_command" ) as mock_get_command, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_health_url" - ) as mock_get_health, - patch( - "nemo_evaluator_launcher.executors.slurm.executor.get_endpoint_url" - ) as mock_get_endpoint, patch("subprocess.run", side_effect=mock_subprocess_run), ): # Configure mocks @@ -1509,8 +1674,6 @@ def mock_get_task_side_effect(task_name, mapping): cmd="nemo-evaluator-launcher --task mmlu_pro", debug="# Test command for mmlu_pro sbatch failure", ) - mock_get_health.return_value = "http://127.0.0.1:8000/health" - mock_get_endpoint.return_value = "http://127.0.0.1:8000/v1" # Should raise RuntimeError for sbatch failure with pytest.raises(