From 501e8bb85660110bfd509b5534f136469e805ef1 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Mon, 8 Jun 2026 14:36:46 +0200 Subject: [PATCH 1/8] CON-1516 remove vCPU self-test preflight gate --- tests/cli/test_machines_commands.py | 22 +++++++++++++++++++++ tests/cli/test_self_test_support_bundle.py | 2 +- vast.py | 16 +++------------ vastai/cli/self_test/machine_diagnostics.py | 13 ------------ 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index bb8b83a6..82f71840 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -581,6 +581,28 @@ def test_preflight_direct_port_minimum_scales_by_gpu_count(self): assert direct_ports["operator"] == ">=" assert "3 directly mapped ports per listed GPU" in direct_ports["purpose"] + def test_preflight_does_not_gate_on_virtual_cpu_count(self): + from vastai.cli.self_test.machine_diagnostics import ( + failed_checks, + preflight_requirement_checks, + ) + + offer = _self_test_offer( + num_gpus=8, + gpu_ram=24 * 1024, + gpu_total_ram=8 * 24 * 1024, + cpu_ram=256 * 1024, + cpu_cores=1, + direct_port_count=24, + inet_down=600, + inet_up=600, + ) + + checks = preflight_requirement_checks(offer) + + assert "cpu.cores" not in {check["id"] for check in checks} + assert failed_checks(checks) == [] + def test_preflight_direct_port_overage_renders_advisory( self, parse_argv, patch_get_client, monkeypatch, capsys ): diff --git a/tests/cli/test_self_test_support_bundle.py b/tests/cli/test_self_test_support_bundle.py index 4295acd5..6d2830ba 100644 --- a/tests/cli/test_self_test_support_bundle.py +++ b/tests/cli/test_self_test_support_bundle.py @@ -144,7 +144,7 @@ def test_self_test_bundle_creation_error_preserves_original_failure( captured = capsys.readouterr() assert exc_info.value.code == 1 assert "WARNING: failed to create self-test diagnostic bundle: disk full" in captured.out - assert "Test failed: 8 preflight requirement check(s) failed." in captured.out + assert "Test failed: 7 preflight requirement check(s) failed." in captured.out def test_self_test_runtime_failure_bundle_includes_instance_logs( diff --git a/vast.py b/vast.py index 44c4a64e..0612c1d1 100644 --- a/vast.py +++ b/vast.py @@ -9459,8 +9459,9 @@ def check_requirements(machine_id, api_key, args): Validates whether a machine meets the specified hardware and performance requirements. This function queries the machine's offers and checks various criteria such as CUDA - version, reliability, port count, PCIe bandwidth, internet speeds, GPU RAM, system - RAM, and CPU cores relative to the number of GPUs. If any of these requirements are + version, reliability, port count, PCIe bandwidth, internet speeds, GPU RAM, and + system RAM. Physical CPU core capacity is validated by the self-test image at + runtime. If any of these requirements are not met, it records the reasons for the failure. Args: @@ -9562,17 +9563,6 @@ def check_requirements(machine_id, api_key, args): debug_print(args, f"CPU RAM: {cpu_ram} MB") debug_print(args, f"Total GPU RAM: {gpu_total_ram} MB") - # 9. CPU Cores vs. Number of GPUs - cpu_cores = int(safe_float(top_offer.get('cpu_cores'))) - num_gpus = int(safe_float(top_offer.get('num_gpus'))) - if cpu_cores < 2 * num_gpus: - unmet_reasons.append("Number of CPU cores is less than twice the number of GPUs.") - - # Debugging Information for CPU Cores - if args.debugging: - debug_print(args, f"CPU Cores: {cpu_cores}") - debug_print(args, f"Number of GPUs: {num_gpus}") - # Return True if all requirements are met, False otherwise if unmet_reasons: progress_print(args, f"Machine ID {machine_id} does not meet the requirements:") diff --git a/vastai/cli/self_test/machine_diagnostics.py b/vastai/cli/self_test/machine_diagnostics.py index 1ed455bf..b3127ee0 100644 --- a/vastai/cli/self_test/machine_diagnostics.py +++ b/vastai/cli/self_test/machine_diagnostics.py @@ -153,7 +153,6 @@ def preflight_requirement_checks(offer): per_gpu_ram_gib = per_gpu_vram_gib(offer) required_mbps = required_inet_mbps(gpu_total_ram) cpu_ram = safe_float(offer.get("cpu_ram")) - cpu_cores = int(safe_float(offer.get("cpu_cores"))) num_gpus = int(safe_float(offer.get("num_gpus"))) listed_gpus = num_gpus if num_gpus > 0 else 1 direct_port_count = safe_float(offer.get("direct_port_count")) @@ -161,7 +160,6 @@ def preflight_requirement_checks(offer): recommended_max_ports = 64 * listed_gpus uncapped_required_cpu_ram = 0.95 * gpu_total_ram required_cpu_ram = min(uncapped_required_cpu_ram, SYSTEM_RAM_REQUIREMENT_CAP_MIB) - required_cpu_cores = 2 * num_gpus checks = [ _check( @@ -265,17 +263,6 @@ def preflight_requirement_checks(offer): "total VRAM, up to the 2 TB cap." ), ), - _check( - "cpu.cores", - "CPU cores", - cpu_cores, - required_cpu_cores, - ">=", - "cores", - cpu_cores >= required_cpu_cores, - "The tester expects at least two CPU cores per GPU for stable orchestration.", - "Expose more CPU cores to the host or reduce the GPU count for this offer.", - ), ] if direct_port_count > recommended_max_ports: checks.append( From 99b1b0db00631b2bdae61b8879bcd6a375d5e261 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Thu, 18 Jun 2026 13:46:57 +0200 Subject: [PATCH 2/8] CON-1516 clean self-test remediation wording --- vastai/cli/self_test/machine_diagnostics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vastai/cli/self_test/machine_diagnostics.py b/vastai/cli/self_test/machine_diagnostics.py index b3127ee0..26eb5c3b 100644 --- a/vastai/cli/self_test/machine_diagnostics.py +++ b/vastai/cli/self_test/machine_diagnostics.py @@ -524,7 +524,7 @@ def requirement_failure(checks): "code": "preflight_requirements_failed", "summary": summary, "failed_check_ids": [check["id"] for check in failures], - "remediation": "Resolve the failed checks below, or rerun with --ignore-requirements to dogfood anyway.", + "remediation": "Resolve the failed checks below, or rerun with --ignore-requirements to run the diagnostic anyway.", } From ce49ea0bbd0f155e14f3e78086d88ed61f84bf13 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Fri, 19 Jun 2026 10:15:41 +0200 Subject: [PATCH 3/8] CON-1516 add UDP self-test probe --- tests/cli/test_machines_commands.py | 145 +++++++++++++++++-- vast.py | 68 ++++++++- vastai/cli/commands/machines.py | 153 +++++++++++++++++++- vastai/cli/self_test/runtime_diagnostics.py | 60 ++++++++ 4 files changed, 405 insertions(+), 21 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index 82f71840..f4aea519 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -8,6 +8,29 @@ import requests +@pytest.fixture(autouse=True) +def _self_test_udp_probe_success(monkeypatch): + """Default self-test machine tests to a successful external UDP echo probe.""" + from vastai.cli.commands import machines + + def _probe(public_ip, host_port, *, mapped_ports=None, attempts=3, timeout_seconds=2): + return True, { + "url": f"udp://{public_ip}:{host_port}", + "public_ip": public_ip, + "container_port": "5001/udp", + "external_port": str(host_port), + "host_port": str(host_port), + "timeout_seconds": timeout_seconds, + "attempt_count": 1, + "response_received": True, + "last_error_type": None, + "last_error": None, + "mapped_ports": sorted((mapped_ports or {}).keys()), + } + + monkeypatch.setattr(machines, "probe_udp_echo", _probe) + + class TestShowMachines: def test_show_machines_raw(self, parse_argv, patch_get_client, mock_response): patch_get_client.get.return_value = mock_response(200, { @@ -107,7 +130,7 @@ def test_successful_destroy_does_not_warn_when_instance_is_already_gone( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "5000"}]}, + "ports": {"5000/tcp": [{"HostPort": "5000"}], "5001/udp": [{"HostPort": "5001"}]}, "status_msg": "", } @@ -173,7 +196,7 @@ def test_ignore_requirements_warns_on_success(self, parse_argv, monkeypatch, cap "intended_status": "running", "actual_status": "running", "public_ipaddr": "203.0.113.10", - "ports": {"5000/tcp": [{"HostPort": "5000"}]}, + "ports": {"5000/tcp": [{"HostPort": "5000"}], "5001/udp": [{"HostPort": "5001"}]}, } monkeypatch.setattr(machines.offers_api, "search_offers", Mock(return_value=[offer])) @@ -711,7 +734,7 @@ def test_ignore_requirements_runtime_success_preserves_preflight_as_metadata( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "5000"}]}, + "ports": {"5000/tcp": [{"HostPort": "5000"}], "5001/udp": [{"HostPort": "5001"}]}, "status_msg": "", } monkeypatch.setattr( @@ -796,6 +819,7 @@ def test_default_cuda_mapping_still_selects_official_image( assert create.call_args.kwargs["runtype"] == "ssh_direc ssh_proxy" assert create.call_args.kwargs["label"] == "vast-self-test-machine-42" assert result["diagnostics"]["launch"]["label"] == "vast-self-test-machine-42" + assert result["diagnostics"]["launch"]["ports"] == ["5000/tcp", "1234/tcp", "5001/udp"] def test_cuda_mapping_selects_cuda_133_exact_match( self, parse_argv, patch_get_client, monkeypatch @@ -1018,7 +1042,7 @@ def test_missing_progress_port_reports_available_ports( assert result["diagnostics"]["runtime_failure"]["progress_endpoint"] == endpoint assert destroy.call_count >= 1 - def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( + def test_missing_udp_port_reports_available_ports( self, parse_argv, patch_get_client, monkeypatch ): offer = _self_test_offer() @@ -1045,6 +1069,107 @@ def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( destroy = Mock(return_value={"success": True}) monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) + + args = parse_argv(["self-test", "machine", "42", "--raw"]) + result = args.func(args) + + udp_probe = result["diagnostics"]["udp_probe"] + assert result["failure_code"] == "udp_port_not_mapped" + assert udp_probe["container_port"] == "5001/udp" + assert udp_probe["external_port"] is None + assert udp_probe["mapped_ports"] == ["22/tcp", "5000/tcp"] + assert result["diagnostics"]["runtime_failure"]["udp_probe"] == udp_probe + assert destroy.call_count >= 1 + + def test_udp_probe_failure_after_tcp_success_is_distinct( + self, parse_argv, patch_get_client, monkeypatch, capsys + ): + offer = _self_test_offer() + running_instance = { + "id": 123, + "actual_status": "running", + "intended_status": "running", + "public_ipaddr": "127.0.0.1", + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, + "status_msg": "", + } + udp_diagnostic = { + "url": "udp://127.0.0.1:45001", + "public_ip": "127.0.0.1", + "container_port": "5001/udp", + "external_port": "45001", + "host_port": "45001", + "timeout_seconds": 2, + "attempt_count": 3, + "response_received": False, + "last_error_type": "TimeoutError", + "last_error": "timed out", + "mapped_ports": ["5000/tcp", "5001/udp"], + } + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.create_instance", + Mock(return_value={"new_contract": 123}), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.show_instance", + Mock(return_value=running_instance), + ) + destroy = Mock(return_value={"success": True}) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) + monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) + monkeypatch.setattr( + "vastai.cli.commands.machines.requests.get", + Mock(return_value=SimpleNamespace(status_code=200, text="Starting tests...")), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.probe_udp_echo", + Mock(return_value=(False, udp_diagnostic)), + ) + + args = parse_argv(["self-test", "machine", "42"]) + with pytest.raises(SystemExit) as exc_info: + args.func(args) + + captured = capsys.readouterr() + assert exc_info.value.code == 1 + assert "Successfully established HTTPS connection to the server." in captured.out + assert "UDP self-test probe failed after TCP progress endpoint was reachable." in captured.out + assert "External UDP port tested: 45001" in captured.out + assert "- code: udp_probe_failed" in captured.out + assert "- UDP tried: udp://127.0.0.1:45001" in captured.out + assert destroy.call_count >= 1 + + def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( + self, parse_argv, patch_get_client, monkeypatch + ): + offer = _self_test_offer() + running_instance = { + "id": 123, + "actual_status": "running", + "intended_status": "running", + "public_ipaddr": "127.0.0.1", + "ports": {"5000/tcp": [{"HostPort": "45000"}], "22/tcp": [{"HostPort": "40022"}], "5001/udp": [{"HostPort": "45001"}]}, + "status_msg": "", + } + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.create_instance", + Mock(return_value={"new_contract": 123}), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.show_instance", + Mock(return_value=running_instance), + ) + destroy = Mock(return_value={"success": True}) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) + monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) monkeypatch.setattr( "vastai.cli.commands.machines.requests.get", Mock(side_effect=requests.exceptions.ConnectTimeout("timed out for ?api_key=secret")), @@ -1064,7 +1189,7 @@ def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( assert endpoint["last_error_type"] == "ConnectTimeout" assert "api_key=secret" not in endpoint["last_error"] assert "api_key=REDACTED" in endpoint["last_error"] - assert endpoint["mapped_ports"] == ["22/tcp", "5000/tcp"] + assert endpoint["mapped_ports"] == ["22/tcp", "5000/tcp", "5001/udp"] assert result["diagnostics"]["runtime_failure"]["progress_endpoint"] == endpoint assert destroy.call_count >= 1 @@ -1077,7 +1202,7 @@ def test_progress_endpoint_failure_prints_external_port( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "45000"}], "22/tcp": [{"HostPort": "40022"}]}, + "ports": {"5000/tcp": [{"HostPort": "45000"}], "22/tcp": [{"HostPort": "40022"}], "5001/udp": [{"HostPort": "45001"}]}, "status_msg": "", } monkeypatch.setattr( @@ -1110,7 +1235,7 @@ def test_progress_endpoint_failure_prints_external_port( assert exc_info.value.code == 1 assert "External port tested: 45000" in captured.out assert "- external port tested: 45000" in captured.out - assert "- mapped container ports: 22/tcp, 5000/tcp" in captured.out + assert "- mapped container ports: 22/tcp, 5000/tcp, 5001/udp" in captured.out def test_progress_endpoint_lost_after_success_records_different_failure( self, parse_argv, patch_get_client, monkeypatch @@ -1121,7 +1246,7 @@ def test_progress_endpoint_lost_after_success_records_different_failure( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "45000"}]}, + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, "status_msg": "", } monkeypatch.setattr( @@ -1172,7 +1297,7 @@ def test_progress_endpoint_http_non_200_records_status_code( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "45000"}]}, + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, "status_msg": "", } monkeypatch.setattr( @@ -1215,7 +1340,7 @@ def test_progress_endpoint_empty_200_records_empty_timeout( "actual_status": "running", "intended_status": "running", "public_ipaddr": "127.0.0.1", - "ports": {"5000/tcp": [{"HostPort": "45000"}]}, + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, "status_msg": "", } monkeypatch.setattr( diff --git a/vast.py b/vast.py index 0612c1d1..97a7d140 100644 --- a/vast.py +++ b/vast.py @@ -11,6 +11,7 @@ import sys import argparse import os +import socket import time from typing import Dict, List, Tuple, Optional from datetime import date, datetime, timedelta, timezone @@ -33,6 +34,31 @@ import textwrap from pathlib import Path import warnings + +UDP_PROBE_PAYLOAD = b"vast-self-test-udp-probe" +UDP_PROBE_RESPONSE_PREFIX = b"vast-self-test-udp-ok:" + + +def probe_udp_echo(public_ip, host_port, attempts=3, timeout_seconds=2): + """Send a UDP datagram to the mapped self-test port and wait for the image echo.""" + try: + target_port = int(host_port) + except (TypeError, ValueError) as exc: + return False, f"{exc.__class__.__name__}: invalid UDP port {host_port!r}" + + last_error = "" + for _ in range(attempts): + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.settimeout(timeout_seconds) + sock.sendto(UDP_PROBE_PAYLOAD, (public_ip, target_port)) + response, _addr = sock.recvfrom(4096) + if response == UDP_PROBE_RESPONSE_PREFIX + UDP_PROBE_PAYLOAD: + return True, "" + last_error = f"Unexpected UDP response: {response!r}" + except OSError as exc: + last_error = f"{exc.__class__.__name__}: {exc}" + return False, last_error import importlib.metadata @@ -8568,7 +8594,7 @@ def search_offers_and_get_top(machine_id): lang_utf8=False, python_utf8=False, extra=None, - env="-e TZ=PDT -e XNAME=XX4 -p 5000:5000 -p 1234:1234", + env="-e TZ=PDT -e XNAME=XX4 -p 5000:5000 -p 1234:1234 -p 5001:5001/udp", args=None, force=False, cancel_unavail=False, @@ -8626,10 +8652,12 @@ def search_offers_and_get_top(machine_id): if not ip_address: result["reason"] = "Failed to retrieve public IP address." else: - port_mappings = instance_info.get("ports", {}).get("5000/tcp", []) + all_ports = instance_info.get("ports", {}) + port_mappings = all_ports.get("5000/tcp", []) port = port_mappings[0].get("HostPort") if port_mappings else None + udp_port_mappings = all_ports.get("5001/udp", []) + udp_port = udp_port_mappings[0].get("HostPort") if udp_port_mappings else None if not port: - all_ports = instance_info.get("ports", {}) progress_print(args, f"Port 5000/tcp not found in instance port mappings.") progress_print(args, f"All mapped ports on instance: {all_ports if all_ports else 'none'}") progress_print(args, f"Possible causes:") @@ -8638,10 +8666,19 @@ def search_offers_and_get_top(machine_id): progress_print(args, f" - The container failed to expose the port correctly.") progress_print(args, f"Check direct_port_count: vastai search offers 'machine_id={args.machine_id} rentable=any verified=any'") result["reason"] = f"Port 5000/tcp not mapped. Available ports: {all_ports}" + elif not udp_port: + progress_print(args, f"Port 5001/udp not found in instance port mappings.") + progress_print(args, f"All mapped ports on instance: {all_ports if all_ports else 'none'}") + progress_print(args, f"Possible causes:") + progress_print(args, f" - The self-test launch did not map the UDP probe port.") + progress_print(args, f" - direct_port_count is too low on this machine (must be at least 3 per GPU).") + progress_print(args, f" - The container failed to expose the UDP probe port correctly.") + progress_print(args, f"Check direct_port_count: vastai search offers 'machine_id={args.machine_id} rentable=any verified=any'") + result["reason"] = f"Port 5001/udp not mapped. Available ports: {all_ports}" else: delay = "15" success, reason = run_machinetester( - ip_address, port, str(instance_id), args.machine_id, delay, args, api_key=api_key + ip_address, port, udp_port, str(instance_id), args.machine_id, delay, args, api_key=api_key ) result["success"] = success result["reason"] = reason @@ -9239,7 +9276,7 @@ def instance_exist(instance_id, api_key, args): debug_print(args, f"No instance found or Unexpected error checking instance existence: {e}") return False -def run_machinetester(ip_address, port, instance_id, machine_id, delay, args, api_key=None): +def run_machinetester(ip_address, port, udp_port, instance_id, machine_id, delay, args, api_key=None): """ Executes machine testing by connecting to the specified IP and port, monitoring the instance's status, and handling test completion or failures. @@ -9322,6 +9359,7 @@ def is_instance(instance_id): no_response_seconds = 0 printed_lines = set() first_connection_established = False # Flag to track first successful connection + udp_probe_completed = False instance_destroyed = False # Track whether the instance has been destroyed try: while time.time() - start_time < 600: @@ -9348,6 +9386,24 @@ def is_instance(instance_id): if response.status_code == 200 and not first_connection_established: progress_print(args, "Successfully established HTTPS connection to the server.") first_connection_established = True + udp_ok, udp_error = probe_udp_echo(ip_address, udp_port) + if udp_ok: + progress_print(args, f"Successfully verified UDP echo on external port {udp_port}.") + udp_probe_completed = True + else: + reason = "TCP progress endpoint was reachable, but UDP echo probe failed" + progress_print(args, "UDP self-test probe failed after TCP progress endpoint was reachable.") + progress_print(args, f"Tried: udp://{ip_address}:{udp_port}") + progress_print(args, f"Last UDP result: {udp_error}") + progress_print(args, "Possible causes:") + progress_print(args, " - UDP firewall/NAT forwarding is blocking the mapped public port.") + progress_print(args, " - TCP forwarding works, but UDP forwarding was not configured symmetrically.") + progress_print(args, " - Router/provider rules, CGNAT, or NAT hairpinning are blocking UDP.") + with open("Error_testresults.log", "a") as f: + f.write(f"{machine_id}:{instance_id} {reason} (udp_port={udp_port}, ip={ip_address}, error={udp_error})\n") + destroy_instance_silent(instance_id, destroy_args) + instance_destroyed = True + return False, reason message = response.text.strip() if args.debugging: @@ -9363,6 +9419,8 @@ def is_instance(instance_id): new_lines = [line for line in lines if line not in printed_lines] for line in new_lines: if line == 'DONE': + if not udp_probe_completed: + progress_print(args, "WARNING: Test reached DONE before UDP probe completion was recorded.") progress_print(args, "Test completed successfully.") with open("Pass_testresults.log", "a") as f: f.write(f"{machine_id}\n") diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index dd2f1752..cc53045a 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -2,6 +2,7 @@ import json import os +import socket import sys import time import warnings @@ -48,9 +49,13 @@ PROGRESS_ENDPOINT_UNREACHABLE, PROGRESS_PORT_NOT_MAPPED, RUNTIME_TEST_TIMEOUT, + UDP_CONTAINER_PORT, + UDP_PORT_NOT_MAPPED, + UDP_PROBE_FAILED, classify_status_msg, make_progress_endpoint_diagnostic, make_failure, + make_udp_probe_diagnostic, redact_secret_text, ) from vastai.cli.self_test.support_bundle import ( @@ -63,6 +68,65 @@ parser = _get_parser() SELF_TEST_INSTANCE_LABEL_PREFIX = "vast-self-test-machine" INSTANCE_LOG_TAIL_LINES = 1000 +UDP_PROBE_PAYLOAD = b"vast-self-test-udp-probe" +UDP_PROBE_RESPONSE_PREFIX = b"vast-self-test-udp-ok:" + + +def _first_host_port(mapped_ports, container_port): + port_rows = (mapped_ports or {}).get(container_port) or [] + return str(port_rows[0].get("HostPort")) if port_rows else None + + +def probe_udp_echo(public_ip, host_port, *, mapped_ports=None, attempts=3, timeout_seconds=2): + """Send a UDP datagram to the mapped self-test port and wait for the image echo.""" + attempt_count = 0 + last_error_type = None + last_error = None + try: + target_port = int(host_port) + except (TypeError, ValueError) as exc: + diagnostic = make_udp_probe_diagnostic( + public_ip=public_ip, + host_port=host_port, + timeout_seconds=timeout_seconds, + attempt_count=0, + last_error_type=exc.__class__.__name__, + last_error=exc, + mapped_ports=mapped_ports, + ) + return False, diagnostic + + for attempt_count in range(1, attempts + 1): + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.settimeout(timeout_seconds) + sock.sendto(UDP_PROBE_PAYLOAD, (public_ip, target_port)) + response, _addr = sock.recvfrom(4096) + if response == UDP_PROBE_RESPONSE_PREFIX + UDP_PROBE_PAYLOAD: + return True, make_udp_probe_diagnostic( + public_ip=public_ip, + host_port=host_port, + timeout_seconds=timeout_seconds, + attempt_count=attempt_count, + response_received=True, + mapped_ports=mapped_ports, + ) + last_error_type = "UnexpectedResponse" + last_error = f"Unexpected UDP response: {response!r}" + except OSError as exc: + last_error_type = exc.__class__.__name__ + last_error = exc + + diagnostic = make_udp_probe_diagnostic( + public_ip=public_ip, + host_port=host_port, + timeout_seconds=timeout_seconds, + attempt_count=attempt_count, + last_error_type=last_error_type, + last_error=last_error, + mapped_ports=mapped_ports, + ) + return False, diagnostic # --------------------------------------------------------------------------- @@ -854,6 +918,23 @@ def render_runtime_failure(): progress_print(f"- last result: {' - '.join(last_bits)}") if endpoint.get("mapped_ports"): progress_print(f"- mapped container ports: {', '.join(endpoint['mapped_ports'])}") + udp_probe = diagnostic.get("udp_probe") or result.get("diagnostics", {}).get("udp_probe") + if udp_probe: + if udp_probe.get("url"): + progress_print(f"- UDP tried: {udp_probe['url']}") + udp_port = udp_probe.get("external_port") or udp_probe.get("host_port") + if udp_port: + progress_print(f"- external UDP port tested: {udp_port}") + progress_print(f"- UDP response received: {udp_probe.get('response_received')}") + udp_bits = [] + if udp_probe.get("last_error_type"): + udp_bits.append(str(udp_probe["last_error_type"])) + if udp_probe.get("last_error"): + udp_bits.append(str(udp_probe["last_error"])) + if udp_bits: + progress_print(f"- UDP last result: {' - '.join(udp_bits)}") + if udp_probe.get("mapped_ports"): + progress_print(f"- mapped container ports: {', '.join(udp_probe['mapped_ports'])}") if diagnostic.get("remediation"): progress_print(f"- remediation: {diagnostic['remediation']}") steps = diagnostic.get("suggested_steps") or [] @@ -1103,13 +1184,13 @@ def image_for(version): result["phase"] = "rental" result["stage"] = "create_instance" from vastai.cli.util import parse_env - env = parse_env("-e TZ=PDT -e XNAME=XX4 -p 5000:5000 -p 1234:1234") + env = parse_env("-e TZ=PDT -e XNAME=XX4 -p 5000:5000 -p 1234:1234 -p 5001:5001/udp") runtype = "ssh_direc ssh_proxy" self_test_label = f"{SELF_TEST_INSTANCE_LABEL_PREFIX}-{args.machine_id}" result["diagnostics"]["launch"] = { "runtype": runtype, "jupyter_lab": False, - "ports": ["5000/tcp", "1234/tcp"], + "ports": ["5000/tcp", "1234/tcp", UDP_CONTAINER_PORT], "label": self_test_label, } @@ -1311,7 +1392,7 @@ def wait_for_instance(inst_id, timeout=900, interval=10): ) # ----- run machine tester ----- - def run_machinetester(ip_address, port, inst_id, machine_id, delay, mapped_ports=None): + def run_machinetester(ip_address, port, udp_port, inst_id, machine_id, delay, mapped_ports=None): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) delay = int(delay) message = '' @@ -1323,6 +1404,7 @@ def run_machinetester(ip_address, port, inst_id, machine_id, delay, mapped_ports last_error = None last_status_code = None first_connection_established = False + udp_probe_completed = False def update_progress_endpoint(): endpoint = make_progress_endpoint_diagnostic( @@ -1387,6 +1469,41 @@ def is_instance(iid): if response.status_code == 200 and not first_connection_established: progress_print("Successfully established HTTPS connection to the server.") first_connection_established = True + udp_ok, udp_diagnostic = probe_udp_echo( + ip_address, + udp_port, + mapped_ports=mapped_ports, + ) + result["diagnostics"]["udp_probe"] = udp_diagnostic + if udp_ok: + progress_print( + "Successfully verified UDP echo " + f"on external port {udp_diagnostic.get('external_port')}." + ) + udp_probe_completed = True + else: + progress_print("UDP self-test probe failed after TCP progress endpoint was reachable.") + progress_print(f"Tried: {udp_diagnostic.get('url')}") + if udp_diagnostic.get("external_port"): + progress_print(f"External UDP port tested: {udp_diagnostic.get('external_port')}") + if udp_diagnostic.get("last_error_type") or udp_diagnostic.get("last_error"): + last_udp_result = udp_diagnostic.get("last_error_type") or "" + if udp_diagnostic.get("last_error"): + last_udp_result = f"{last_udp_result}: {udp_diagnostic.get('last_error')}".strip(": ") + progress_print(f"Last UDP result: {last_udp_result}") + progress_print("Possible causes:") + progress_print(" 1. UDP firewall/NAT forwarding is blocking the mapped public port") + progress_print(" 2. TCP forwarding works, but UDP forwarding was not configured symmetrically") + progress_print(" 3. Router/provider rules, CGNAT, or NAT hairpinning are blocking UDP") + destroy_instance_silent(inst_id, collect_logs=True) + instance_destroyed = True + return_reason = "TCP progress endpoint was reachable, but UDP echo probe failed" + return False, return_reason, make_failure( + UDP_PROBE_FAILED, + stage="runtime_network", + details=return_reason, + udp_probe=udp_diagnostic, + ) if response.status_code == 200: message = response.text.strip() @@ -1409,6 +1526,8 @@ def is_instance(iid): for line in new_lines: diagnostic = legacy_parser.process_line(line) if line == 'DONE': + if not udp_probe_completed: + progress_print("WARNING: Test reached DONE before UDP probe completion was recorded.") progress_print("Test completed successfully.") progress_print("Test passed.") destroy_instance_silent(inst_id) @@ -1522,8 +1641,8 @@ def is_instance(iid): ) else: all_ports = instance_info.get("ports", {}) - port_mappings = all_ports.get("5000/tcp", []) - port = port_mappings[0].get("HostPort") if port_mappings else None + port = _first_host_port(all_ports, PROGRESS_CONTAINER_PORT) + udp_port = _first_host_port(all_ports, UDP_CONTAINER_PORT) if not port: endpoint = make_progress_endpoint_diagnostic( public_ip=ip_address, @@ -1547,12 +1666,34 @@ def is_instance(iid): ), "Failed to retrieve mapped port.", ) + elif not udp_port: + udp_diagnostic = make_udp_probe_diagnostic( + public_ip=ip_address, + host_port=None, + timeout_seconds=2, + mapped_ports=all_ports, + ) + result["diagnostics"]["udp_probe"] = udp_diagnostic + progress_print(f"Port {UDP_CONTAINER_PORT} not found in mapped ports. Available ports: {list(all_ports.keys())}") + progress_print("Possible causes:") + progress_print(" 1. The instance launch did not map the self-test UDP probe port") + progress_print(f" 2. direct_port_count below the 3 ports/GPU minimum - check with: vastai search offers 'machine_id={args.machine_id} rentable=any rented=any'") + progress_print(" 3. Container is not exposing port 5001/udp") + set_runtime_failure( + make_failure( + UDP_PORT_NOT_MAPPED, + stage="instance_network", + details=f"Available ports: {list(all_ports.keys())}", + udp_probe=udp_diagnostic, + ), + "Failed to retrieve mapped UDP probe port.", + ) else: delay = "15" result["phase"] = "test" result["stage"] = "run_machinetester" success, reason, runtime_diagnostic = run_machinetester( - ip_address, port, instance_id, args.machine_id, delay, mapped_ports=all_ports, + ip_address, port, udp_port, instance_id, args.machine_id, delay, mapped_ports=all_ports, ) result["success"] = success result["reason"] = reason diff --git a/vastai/cli/self_test/runtime_diagnostics.py b/vastai/cli/self_test/runtime_diagnostics.py index 604b4210..9ea0f165 100644 --- a/vastai/cli/self_test/runtime_diagnostics.py +++ b/vastai/cli/self_test/runtime_diagnostics.py @@ -23,6 +23,8 @@ PROGRESS_ENDPOINT_UNREACHABLE = "progress_endpoint_unreachable" PROGRESS_ENDPOINT_LOST = "progress_endpoint_lost" PROGRESS_EMPTY_TIMEOUT = "progress_empty_timeout" +UDP_PORT_NOT_MAPPED = "udp_port_not_mapped" +UDP_PROBE_FAILED = "udp_probe_failed" RUNTIME_TEST_TIMEOUT = "runtime_test_timeout" LEGACY_PROGRESS_ERROR = "legacy_progress_error" DOCKER_PULL_FAILED = "docker_pull_failed" @@ -35,6 +37,7 @@ INTERRUPTED = "interrupted" CLEANUP_FAILED = "cleanup_failed" PROGRESS_CONTAINER_PORT = "5000/tcp" +UDP_CONTAINER_PORT = "5001/udp" RUNTIME_FAILURE_CODES = ( @@ -49,6 +52,8 @@ PROGRESS_ENDPOINT_UNREACHABLE, PROGRESS_ENDPOINT_LOST, PROGRESS_EMPTY_TIMEOUT, + UDP_PORT_NOT_MAPPED, + UDP_PROBE_FAILED, RUNTIME_TEST_TIMEOUT, LEGACY_PROGRESS_ERROR, DOCKER_PULL_FAILED, @@ -146,6 +151,25 @@ class FailureCatalogEntry: "Check whether the runtime script stalled or stopped writing progress.", ("Inspect runtime logs.", "Retry with debugging enabled."), ), + UDP_PORT_NOT_MAPPED: FailureCatalogEntry( + UDP_PORT_NOT_MAPPED, + "The runtime UDP probe port was not mapped.", + "Confirm port 5001/udp is exposed and direct ports are available.", + ( + "Check the available mapped ports in the diagnostic output.", + "Verify the self-test instance launch included 5001/udp.", + ), + ), + UDP_PROBE_FAILED: FailureCatalogEntry( + UDP_PROBE_FAILED, + "The runtime UDP probe did not receive an echo response.", + "Check UDP firewall/NAT forwarding separately from TCP forwarding.", + ( + "Confirm UDP forwarding is configured for the mapped public port.", + "If TCP worked, check router/provider rules that allow TCP but block UDP.", + "Retry from outside the host LAN to rule out NAT hairpinning behavior.", + ), + ), RUNTIME_TEST_TIMEOUT: FailureCatalogEntry( RUNTIME_TEST_TIMEOUT, "The runtime test did not complete before timeout.", @@ -306,6 +330,35 @@ def make_progress_endpoint_diagnostic( return diagnostic +def make_udp_probe_diagnostic( + *, + public_ip: str | None = None, + container_port: str = UDP_CONTAINER_PORT, + host_port: str | int | None = None, + timeout_seconds: int | float | None = None, + attempt_count: int = 0, + response_received: bool = False, + last_error_type: str | None = None, + last_error: object = None, + mapped_ports=None, +) -> dict[str, object]: + """Shape UDP probe state for raw output and UI consumption.""" + url = f"udp://{public_ip}:{host_port}" if public_ip and host_port else None + return { + "url": url, + "public_ip": public_ip, + "container_port": container_port, + "external_port": str(host_port) if host_port is not None else None, + "host_port": str(host_port) if host_port is not None else None, + "timeout_seconds": timeout_seconds, + "attempt_count": int(attempt_count or 0), + "response_received": bool(response_received), + "last_error_type": last_error_type, + "last_error": redact_secret_text(last_error), + "mapped_ports": _mapped_port_names(mapped_ports), + } + + def failure_catalog() -> dict[str, dict[str, object]]: """Return a JSON-serializable copy of the runtime failure catalog.""" return { @@ -338,6 +391,7 @@ def make_failure( suggested_steps: Iterable[str] | None = None, underlying_error: str | None = None, progress_endpoint: dict[str, object] | None = None, + udp_probe: dict[str, object] | None = None, ) -> dict[str, object]: """Build a raw-output-friendly diagnostic dictionary.""" entry = get_failure_entry(code) @@ -356,6 +410,8 @@ def make_failure( diagnostic["underlying_error"] = underlying_error if progress_endpoint: diagnostic["progress_endpoint"] = progress_endpoint + if udp_probe: + diagnostic["udp_probe"] = udp_probe return diagnostic @@ -484,12 +540,16 @@ def classify_status_msg(status_msg: str | None) -> dict[str, object] | None: "STAGE_STRESS_GPU_BURN", "STAGE_SYSTEM_REQUIREMENTS", "STRESS_GPU_BURN_FAILED", + "UDP_CONTAINER_PORT", + "UDP_PORT_NOT_MAPPED", + "UDP_PROBE_FAILED", "classify_legacy_error_line", "classify_status_msg", "failure_catalog", "get_failure_entry", "make_failure", "make_progress_endpoint_diagnostic", + "make_udp_probe_diagnostic", "parse_legacy_progress", "redact_secret_text", "stage_from_progress_line", From 5582f87d5ac6dc74c180a9993d05429a25308977 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Fri, 19 Jun 2026 11:31:22 +0200 Subject: [PATCH 4/8] CON-1516 compact self-test startup progress --- tests/cli/test_machines_commands.py | 93 +++++++++++++++++++++++++++++ vast.py | 32 +++++++++- vastai/cli/commands/machines.py | 37 +++++++++++- 3 files changed, 157 insertions(+), 5 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index f4aea519..d5c15f11 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -1143,6 +1143,99 @@ def test_udp_probe_failure_after_tcp_success_is_distinct( assert "- UDP tried: udp://127.0.0.1:45001" in captured.out assert destroy.call_count >= 1 + def test_wait_for_instance_loading_status_is_compact_without_debugging( + self, parse_argv, patch_get_client, monkeypatch, capsys + ): + offer = _self_test_offer() + loading_instance = { + "id": 123, + "actual_status": "loading", + "intended_status": "running", + "status_msg": "ff81e2caff08: Verifying Checksum\nff81e2caff08: Download complete", + } + running_instance = { + "id": 123, + "actual_status": "running", + "intended_status": "running", + "public_ipaddr": "127.0.0.1", + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, + "status_msg": "", + } + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.create_instance", + Mock(return_value={"new_contract": 123}), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.show_instance", + Mock(side_effect=[loading_instance, loading_instance, running_instance, running_instance]), + ) + destroy = Mock(return_value={"success": True}) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) + monkeypatch.setattr("vastai.cli.commands.machines.requests.get", Mock(return_value=SimpleNamespace(status_code=200, text="DONE"))) + monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) + + args = parse_argv(["self-test", "machine", "42"]) + with pytest.raises(SystemExit) as exc_info: + args.func(args) + + captured = capsys.readouterr() + assert exc_info.value.code == 0 + assert "Instance 123 is loading; waiting for running status. ready." in captured.out + assert "status: loading" not in captured.out + assert "status_msg:" not in captured.out + assert "Verifying Checksum" not in captured.out + assert destroy.call_count >= 1 + + def test_wait_for_instance_loading_status_is_verbose_with_debugging( + self, parse_argv, patch_get_client, monkeypatch, capsys + ): + offer = _self_test_offer() + loading_instance = { + "id": 123, + "actual_status": "loading", + "intended_status": "running", + "status_msg": "ff81e2caff08: Verifying Checksum\nff81e2caff08: Download complete", + } + running_instance = { + "id": 123, + "actual_status": "running", + "intended_status": "running", + "public_ipaddr": "127.0.0.1", + "ports": {"5000/tcp": [{"HostPort": "45000"}], "5001/udp": [{"HostPort": "45001"}]}, + "status_msg": "", + } + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.create_instance", + Mock(return_value={"new_contract": 123}), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.show_instance", + Mock(side_effect=[loading_instance, running_instance, running_instance]), + ) + destroy = Mock(return_value={"success": True}) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) + monkeypatch.setattr("vastai.cli.commands.machines.requests.get", Mock(return_value=SimpleNamespace(status_code=200, text="DONE"))) + monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) + + args = parse_argv(["self-test", "machine", "42", "--debugging"]) + with pytest.raises(SystemExit) as exc_info: + args.func(args) + + captured = capsys.readouterr() + assert exc_info.value.code == 0 + assert "Instance 123 status: loading / intended: running; waiting for 'running' status." in captured.out + assert "status_msg: ff81e2caff08: Verifying Checksum" in captured.out + assert "Instance 123 is loading; waiting for running status" not in captured.out + assert destroy.call_count >= 1 + def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( self, parse_argv, patch_get_client, monkeypatch ): diff --git a/vast.py b/vast.py index 97a7d140..450aa440 100644 --- a/vast.py +++ b/vast.py @@ -9212,6 +9212,10 @@ def progress_print(args, *args_to_print): if not args.raw: print(*args_to_print) +def progress_write(args, *args_to_print): + if not args.raw: + print(*args_to_print, end="", flush=True) + def debug_print(args, *args_to_print): """ Prints debug messages to the console based on the `debugging` and `raw` flags. @@ -9638,7 +9642,7 @@ def check_requirements(machine_id, api_key, args): return False, [f"Unexpected error: {str(e)}"] -def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, interval=10): +def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, interval=15): """ Waits for an instance to reach a running state and monitors its status for errors. @@ -9648,6 +9652,7 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int args.debugging = False start_time = time.time() + compact_wait_started = False show_args = argparse.Namespace( id=instance_id, quiet=False, @@ -9662,6 +9667,12 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int if args.debugging: debug_print(args, "Starting wait_for_instance with ID:", instance_id) + + def finish_compact_wait(suffix=""): + nonlocal compact_wait_started + if compact_wait_started: + progress_write(args, suffix or "\n") + compact_wait_started = False while time.time() - start_time < timeout: try: @@ -9669,6 +9680,7 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int instance_info = show__instance(show_args) if not instance_info: + finish_compact_wait() progress_print(args, f"No information returned for instance {instance_id}. Retrying...") time.sleep(interval) continue # Retry @@ -9676,6 +9688,7 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int # Check for error in status_msg status_msg = instance_info.get('status_msg', '') if status_msg and 'Error' in status_msg: + finish_compact_wait() reason = f"Instance {instance_id} encountered an error: {status_msg.strip()}" progress_print(args, reason) @@ -9690,7 +9703,9 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int # Check if instance went offline actual_status = instance_info.get('actual_status', 'unknown') + intended_status = instance_info.get('intended_status', 'unknown') if actual_status == 'offline': + finish_compact_wait() reason = "Instance offline during testing" progress_print(args, reason) @@ -9704,22 +9719,33 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int return False, reason # Check if instance is running - if instance_info.get('intended_status') == 'running' and actual_status == 'running': + if intended_status == 'running' and actual_status == 'running': + finish_compact_wait(" ready.\n") if args.debugging: debug_print(args, f"Instance {instance_id} is now running.") return instance_info, None # Return instance_info with None for reason # Print feedback about the current status - progress_print(args, f"Instance {instance_id} status: {actual_status}... waiting for 'running' status.") + if args.debugging: + progress_print(args, f"Instance {instance_id} status: {actual_status} / intended: {intended_status}; waiting for 'running' status.") + if status_msg: + progress_print(args, f"status_msg: {status_msg.strip()}") + elif not compact_wait_started: + progress_write(args, f"Instance {instance_id} is loading; waiting for running status") + compact_wait_started = True + else: + progress_write(args, ".") time.sleep(interval) except Exception as e: + finish_compact_wait() progress_print(args, f"Error retrieving instance info for {instance_id}: {e}. Retrying...") if args.debugging: debug_print(args, f"Exception details: {str(e)}") time.sleep(interval) # Timeout reached without instance running + finish_compact_wait() reason = f"Instance did not become running within {timeout} seconds. Verify network configuration. Use the self-test machine function in vast cli" progress_print(args, reason) return False, reason diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index cc53045a..2a5f7201 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -815,6 +815,10 @@ def progress_print(*args_to_print): if not args.raw: print(*args_to_print) + def progress_write(*args_to_print): + if not args.raw: + print(*args_to_print, end="", flush=True) + def debug_print(*args_to_print): if args.debugging: cli_output.append(f"DEBUG: {output_line(*args_to_print)}") @@ -1292,14 +1296,22 @@ def destroy_instance_silent(inst_id, collect_logs=False): return {"success": False, "error": "Max retries exceeded"} # ----- wait for instance to start ----- - def wait_for_instance(inst_id, timeout=900, interval=10): + def wait_for_instance(inst_id, timeout=900, interval=15): start_time = time.time() + compact_wait_started = False debug_print("Starting wait_for_instance with ID:", inst_id) + def finish_compact_wait(suffix=""): + nonlocal compact_wait_started + if compact_wait_started: + progress_write(suffix or "\n") + compact_wait_started = False + while time.time() - start_time < timeout: try: instance_info = instances_api.show_instance(client, id=inst_id) if not instance_info: + finish_compact_wait() progress_print(f"No information returned for instance {inst_id}. Retrying...") time.sleep(interval) continue @@ -1320,6 +1332,7 @@ def wait_for_instance(inst_id, timeout=900, interval=10): ) ) if status_msg_clean and status_msg_is_error: + finish_compact_wait() diagnostic = classify_status_msg(status_msg_clean) or make_failure( DAEMON_STARTUP_FAILED, stage="startup", @@ -1338,6 +1351,7 @@ def wait_for_instance(inst_id, timeout=900, interval=10): actual_status = instance_info.get('actual_status', 'unknown') intended_status = instance_info.get('intended_status', 'unknown') if actual_status == 'offline': + finish_compact_wait() reason = "Instance offline during testing" diagnostic = make_failure(INSTANCE_OFFLINE_BEFORE_TEST, stage="startup") progress_print(reason) @@ -1349,6 +1363,7 @@ def wait_for_instance(inst_id, timeout=900, interval=10): return False, reason, diagnostic if intended_status in ('stopped', 'exited') or actual_status in ('stopped', 'exited'): + finish_compact_wait() reason = f"Instance {inst_id} stopped before reaching running status" if status_msg_clean: reason = f"{reason}: {status_msg_clean}" @@ -1371,18 +1386,36 @@ def wait_for_instance(inst_id, timeout=900, interval=10): return False, reason, diagnostic if intended_status == 'running' and actual_status == 'running': + was_compact_waiting = compact_wait_started + finish_compact_wait(" ready.\n") + if was_compact_waiting: + cli_output.append(f"Instance {inst_id} reached running status.") debug_print(f"Instance {inst_id} is now running.") return instance_info, None, None - progress_print(f"Instance {inst_id} status: {actual_status}... waiting for 'running' status.") + if args.debugging: + progress_print( + f"Instance {inst_id} status: {actual_status} / intended: {intended_status}; " + "waiting for 'running' status." + ) + if status_msg_clean: + progress_print(f"status_msg: {status_msg_clean}") + elif not compact_wait_started: + cli_output.append(f"Instance {inst_id} is loading; waiting for running status.") + progress_write(f"Instance {inst_id} is loading; waiting for running status") + compact_wait_started = True + else: + progress_write(".") time.sleep(interval) except Exception as e: error = safe_error(e) + finish_compact_wait() progress_print(f"Error retrieving instance info for {inst_id}: {error}. Retrying...") debug_print(f"Exception details: {error}") time.sleep(interval) + finish_compact_wait() reason = f"Instance did not become running within {timeout} seconds. Verify network configuration. Use the self-test machine function in vast cli" progress_print(reason) return False, reason, make_failure( From 0611be6d1c2a1c258d2342e7475a6bfcc616dfa7 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Fri, 19 Jun 2026 11:38:24 +0200 Subject: [PATCH 5/8] CON-1516 show self-test loading heartbeat --- tests/cli/test_machines_commands.py | 5 ++++- vast.py | 30 ++++++++------------------ vastai/cli/commands/machines.py | 33 ++++++++--------------------- 3 files changed, 22 insertions(+), 46 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index d5c15f11..98226bba 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -1184,7 +1184,9 @@ def test_wait_for_instance_loading_status_is_compact_without_debugging( captured = capsys.readouterr() assert exc_info.value.code == 0 - assert "Instance 123 is loading; waiting for running status. ready." in captured.out + assert "Instance 123 is loading; waiting for running status..." in captured.out + assert "Still loading... 0s elapsed" in captured.out + assert "Instance 123 is ready after 0s." in captured.out assert "status: loading" not in captured.out assert "status_msg:" not in captured.out assert "Verifying Checksum" not in captured.out @@ -1234,6 +1236,7 @@ def test_wait_for_instance_loading_status_is_verbose_with_debugging( assert "Instance 123 status: loading / intended: running; waiting for 'running' status." in captured.out assert "status_msg: ff81e2caff08: Verifying Checksum" in captured.out assert "Instance 123 is loading; waiting for running status" not in captured.out + assert "Still loading..." not in captured.out assert destroy.call_count >= 1 def test_progress_endpoint_never_reachable_records_endpoint_diagnostic( diff --git a/vast.py b/vast.py index 450aa440..5caf398a 100644 --- a/vast.py +++ b/vast.py @@ -9212,10 +9212,6 @@ def progress_print(args, *args_to_print): if not args.raw: print(*args_to_print) -def progress_write(args, *args_to_print): - if not args.raw: - print(*args_to_print, end="", flush=True) - def debug_print(args, *args_to_print): """ Prints debug messages to the console based on the `debugging` and `raw` flags. @@ -9652,7 +9648,7 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int args.debugging = False start_time = time.time() - compact_wait_started = False + wait_started_at = None show_args = argparse.Namespace( id=instance_id, quiet=False, @@ -9667,12 +9663,6 @@ def wait_for_instance(instance_id, api_key, args, destroy_args, timeout=900, int if args.debugging: debug_print(args, "Starting wait_for_instance with ID:", instance_id) - - def finish_compact_wait(suffix=""): - nonlocal compact_wait_started - if compact_wait_started: - progress_write(args, suffix or "\n") - compact_wait_started = False while time.time() - start_time < timeout: try: @@ -9680,7 +9670,6 @@ def finish_compact_wait(suffix=""): instance_info = show__instance(show_args) if not instance_info: - finish_compact_wait() progress_print(args, f"No information returned for instance {instance_id}. Retrying...") time.sleep(interval) continue # Retry @@ -9688,7 +9677,6 @@ def finish_compact_wait(suffix=""): # Check for error in status_msg status_msg = instance_info.get('status_msg', '') if status_msg and 'Error' in status_msg: - finish_compact_wait() reason = f"Instance {instance_id} encountered an error: {status_msg.strip()}" progress_print(args, reason) @@ -9705,7 +9693,6 @@ def finish_compact_wait(suffix=""): actual_status = instance_info.get('actual_status', 'unknown') intended_status = instance_info.get('intended_status', 'unknown') if actual_status == 'offline': - finish_compact_wait() reason = "Instance offline during testing" progress_print(args, reason) @@ -9720,7 +9707,9 @@ def finish_compact_wait(suffix=""): # Check if instance is running if intended_status == 'running' and actual_status == 'running': - finish_compact_wait(" ready.\n") + if wait_started_at is not None: + elapsed = int(time.time() - wait_started_at) + progress_print(args, f"Instance {instance_id} is ready after {elapsed}s.") if args.debugging: debug_print(args, f"Instance {instance_id} is now running.") return instance_info, None # Return instance_info with None for reason @@ -9730,22 +9719,21 @@ def finish_compact_wait(suffix=""): progress_print(args, f"Instance {instance_id} status: {actual_status} / intended: {intended_status}; waiting for 'running' status.") if status_msg: progress_print(args, f"status_msg: {status_msg.strip()}") - elif not compact_wait_started: - progress_write(args, f"Instance {instance_id} is loading; waiting for running status") - compact_wait_started = True + elif wait_started_at is None: + wait_started_at = time.time() + progress_print(args, f"Instance {instance_id} is loading; waiting for running status...") else: - progress_write(args, ".") + elapsed = int(time.time() - wait_started_at) + progress_print(args, f"Still loading... {elapsed}s elapsed") time.sleep(interval) except Exception as e: - finish_compact_wait() progress_print(args, f"Error retrieving instance info for {instance_id}: {e}. Retrying...") if args.debugging: debug_print(args, f"Exception details: {str(e)}") time.sleep(interval) # Timeout reached without instance running - finish_compact_wait() reason = f"Instance did not become running within {timeout} seconds. Verify network configuration. Use the self-test machine function in vast cli" progress_print(args, reason) return False, reason diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index 2a5f7201..943757e0 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -815,10 +815,6 @@ def progress_print(*args_to_print): if not args.raw: print(*args_to_print) - def progress_write(*args_to_print): - if not args.raw: - print(*args_to_print, end="", flush=True) - def debug_print(*args_to_print): if args.debugging: cli_output.append(f"DEBUG: {output_line(*args_to_print)}") @@ -1298,20 +1294,13 @@ def destroy_instance_silent(inst_id, collect_logs=False): # ----- wait for instance to start ----- def wait_for_instance(inst_id, timeout=900, interval=15): start_time = time.time() - compact_wait_started = False + wait_started_at = None debug_print("Starting wait_for_instance with ID:", inst_id) - def finish_compact_wait(suffix=""): - nonlocal compact_wait_started - if compact_wait_started: - progress_write(suffix or "\n") - compact_wait_started = False - while time.time() - start_time < timeout: try: instance_info = instances_api.show_instance(client, id=inst_id) if not instance_info: - finish_compact_wait() progress_print(f"No information returned for instance {inst_id}. Retrying...") time.sleep(interval) continue @@ -1332,7 +1321,6 @@ def finish_compact_wait(suffix=""): ) ) if status_msg_clean and status_msg_is_error: - finish_compact_wait() diagnostic = classify_status_msg(status_msg_clean) or make_failure( DAEMON_STARTUP_FAILED, stage="startup", @@ -1351,7 +1339,6 @@ def finish_compact_wait(suffix=""): actual_status = instance_info.get('actual_status', 'unknown') intended_status = instance_info.get('intended_status', 'unknown') if actual_status == 'offline': - finish_compact_wait() reason = "Instance offline during testing" diagnostic = make_failure(INSTANCE_OFFLINE_BEFORE_TEST, stage="startup") progress_print(reason) @@ -1363,7 +1350,6 @@ def finish_compact_wait(suffix=""): return False, reason, diagnostic if intended_status in ('stopped', 'exited') or actual_status in ('stopped', 'exited'): - finish_compact_wait() reason = f"Instance {inst_id} stopped before reaching running status" if status_msg_clean: reason = f"{reason}: {status_msg_clean}" @@ -1386,9 +1372,9 @@ def finish_compact_wait(suffix=""): return False, reason, diagnostic if intended_status == 'running' and actual_status == 'running': - was_compact_waiting = compact_wait_started - finish_compact_wait(" ready.\n") - if was_compact_waiting: + if wait_started_at is not None: + elapsed = int(time.time() - wait_started_at) + progress_print(f"Instance {inst_id} is ready after {elapsed}s.") cli_output.append(f"Instance {inst_id} reached running status.") debug_print(f"Instance {inst_id} is now running.") return instance_info, None, None @@ -1400,22 +1386,21 @@ def finish_compact_wait(suffix=""): ) if status_msg_clean: progress_print(f"status_msg: {status_msg_clean}") - elif not compact_wait_started: + elif wait_started_at is None: + wait_started_at = time.time() cli_output.append(f"Instance {inst_id} is loading; waiting for running status.") - progress_write(f"Instance {inst_id} is loading; waiting for running status") - compact_wait_started = True + progress_print(f"Instance {inst_id} is loading; waiting for running status...") else: - progress_write(".") + elapsed = int(time.time() - wait_started_at) + progress_print(f"Still loading... {elapsed}s elapsed") time.sleep(interval) except Exception as e: error = safe_error(e) - finish_compact_wait() progress_print(f"Error retrieving instance info for {inst_id}: {error}. Retrying...") debug_print(f"Exception details: {error}") time.sleep(interval) - finish_compact_wait() reason = f"Instance did not become running within {timeout} seconds. Verify network configuration. Use the self-test machine function in vast cli" progress_print(reason) return False, reason, make_failure( From 6fa87f73ffebf4875901ef2e60be892c24966db5 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Fri, 19 Jun 2026 11:44:27 +0200 Subject: [PATCH 6/8] CON-1516 summarize self-test success by machine --- tests/cli/test_machines_commands.py | 2 +- vast.py | 5 ++--- vastai/cli/commands/machines.py | 6 ++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index 98226bba..22b954d7 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -216,7 +216,7 @@ def test_ignore_requirements_warns_on_success(self, parse_argv, monkeypatch, cap assert "Requirement checks are skipped as a pass/fail gate" in out assert "does not qualify this machine for verification" in out assert out.count("does not qualify this machine for verification") >= 2 - assert "Test passed." in out + assert "Machine ID 123 passed the self-test." in out def test_ignore_requirements_warning_in_raw_summary(self, parse_argv, monkeypatch, capsys): from vastai.cli.commands import machines diff --git a/vast.py b/vast.py index 5caf398a..1b5c31d0 100644 --- a/vast.py +++ b/vast.py @@ -8739,7 +8739,8 @@ def search_offers_and_get_top(machine_id): if result.get("warning"): print(result["warning"]) if result["success"]: - print("Test completed successfully.") + print("") + print(f"Machine ID {args.machine_id} passed the self-test.") sys.exit(0) else: print(f"Test failed: {result['reason']}") @@ -9424,7 +9425,6 @@ def is_instance(instance_id): progress_print(args, "Test completed successfully.") with open("Pass_testresults.log", "a") as f: f.write(f"{machine_id}\n") - progress_print(args, f"Test passed.") destroy_instance_silent(instance_id, destroy_args) instance_destroyed = True return True, "" @@ -9492,7 +9492,6 @@ def is_instance(instance_id): # Ensure instance cleanup if not instance_destroyed and instance_id and instance_exist(instance_id, api_key, destroy_args): destroy_instance_silent(instance_id, destroy_args) - progress_print(args, f"Machine: {machine_id} Done with testing remote.py results {message}") warnings.simplefilter('default') def safe_float(value): diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index 943757e0..edd64d56 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -1547,7 +1547,6 @@ def is_instance(iid): if not udp_probe_completed: progress_print("WARNING: Test reached DONE before UDP probe completion was recorded.") progress_print("Test completed successfully.") - progress_print("Test passed.") destroy_instance_silent(inst_id) instance_destroyed = True return True, "", None @@ -1641,7 +1640,6 @@ def is_instance(iid): finally: if not instance_destroyed and inst_id and instance_exist(inst_id): destroy_instance_silent(inst_id, collect_logs=True) - progress_print(f"Machine: {machine_id} Done with testing remote.py results {message}") warnings.simplefilter('default') # ----- main orchestration: wait then test ----- @@ -1790,8 +1788,8 @@ def is_instance(iid): if result.get("diagnostics", {}).get("preflight_failure"): print("Runtime checks passed, but minimum requirement checks were ignored.") print("This run does not qualify the machine for verification.") - else: - print("Test completed successfully.") + print("") + print(f"Machine ID {args.machine_id} passed the self-test.") sys.exit(0) else: render_runtime_failure() From 8578b35dc157239f686e735dd3383c3092199092 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Wed, 24 Jun 2026 15:33:26 +0200 Subject: [PATCH 7/8] CON-1531 surface self-test support bundle path --- tests/cli/test_self_test_support_bundle.py | 6 +++++- vastai/cli/commands/machines.py | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/cli/test_self_test_support_bundle.py b/tests/cli/test_self_test_support_bundle.py index 6d2830ba..fa988266 100644 --- a/tests/cli/test_self_test_support_bundle.py +++ b/tests/cli/test_self_test_support_bundle.py @@ -62,6 +62,8 @@ def test_self_test_failure_creates_support_bundle( ): from vastai.cli.commands import machines + bundle_path = str(tmp_path / "vast_selftest_42_20260602T100000Z.tar.gz") + def fake_bundle(**kwargs): assert kwargs["machine_id"] == "42" assert kwargs["output_dir"] == str(tmp_path) @@ -71,7 +73,7 @@ def fake_bundle(**kwargs): assert kwargs["result"]["failure_code"] == "preflight_requirements_failed" assert any("Preflight diagnostics for machine 42 failed:" in line for line in kwargs["cli_output"]) return { - "path": str(tmp_path / "vast_selftest_42_20260602T100000Z.tar.gz"), + "path": bundle_path, "created_at_utc": "20260602T100000Z", "size_bytes": 123, "files": ["manifest.json", "self-test-result.json", "self-test-output.log"], @@ -108,6 +110,8 @@ def fake_bundle(**kwargs): assert "Self-test diagnostic bundle saved to:" in captured.out assert "self-test-result.json" in captured.out assert "Review this tarball before sharing it with support." in captured.out + assert f"Support bundle: {bundle_path}" in captured.out + assert captured.out.rfind("Test failed:") < captured.out.rfind("Support bundle:") def test_self_test_bundle_creation_error_preserves_original_failure( diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index edd64d56..f6c073d8 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -878,6 +878,8 @@ def finish_failure(): for line in format_bundle_summary(bundle): print(line) print(f"Test failed: {result['reason']}") + if bundle and bundle.get("path"): + print(f"Support bundle: {bundle.get('path')}") sys.exit(1) def set_runtime_failure(diagnostic, fallback_reason=None): @@ -1798,4 +1800,6 @@ def is_instance(iid): for line in format_bundle_summary(bundle): print(line) print(f"Test failed: {result['reason']}") + if bundle and bundle.get("path"): + print(f"Support bundle: {bundle.get('path')}") sys.exit(1) From f737010e2406192de3aed637967de852b1215f18 Mon Sep 17 00:00:00 2001 From: Hannes Zietsman Date: Thu, 25 Jun 2026 11:21:53 +0200 Subject: [PATCH 8/8] CON-1531 improve self-test diagnostic edges --- tests/cli/test_machines_commands.py | 93 +++++++++++++++++++++ tests/cli/test_runtime_diagnostics.py | 20 +++++ vastai/cli/commands/machines.py | 81 +++++++++++++----- vastai/cli/self_test/runtime_diagnostics.py | 19 +++-- 4 files changed, 185 insertions(+), 28 deletions(-) diff --git a/tests/cli/test_machines_commands.py b/tests/cli/test_machines_commands.py index 22b954d7..70124fd9 100644 --- a/tests/cli/test_machines_commands.py +++ b/tests/cli/test_machines_commands.py @@ -156,6 +156,38 @@ def test_successful_destroy_does_not_warn_when_instance_is_already_gone( assert "Instance 123 destroyed successfully on attempt 1." in captured.out assert "WARNING: failed to destroy test instance 123" not in captured.out + def test_successful_runtime_reports_cleanup_failure_when_destroy_fails( + self, parse_argv, monkeypatch + ): + from vastai.cli.commands import machines + + offer = _self_test_offer() + running_instance = { + "id": 123, + "actual_status": "running", + "intended_status": "running", + "public_ipaddr": "127.0.0.1", + "ports": {"5000/tcp": [{"HostPort": "5000"}], "5001/udp": [{"HostPort": "5001"}]}, + "status_msg": "", + } + + monkeypatch.setattr(machines.offers_api, "search_offers", Mock(return_value=[offer])) + monkeypatch.setattr(machines.instances_api, "create_instance", Mock(return_value={"new_contract": 123})) + monkeypatch.setattr(machines.instances_api, "show_instance", Mock(return_value=running_instance)) + destroy_instance = Mock(side_effect=RuntimeError("api destroy failed")) + monkeypatch.setattr(machines.instances_api, "destroy_instance", destroy_instance) + monkeypatch.setattr(machines.requests, "get", lambda *_, **__: SimpleNamespace(status_code=200, text="DONE")) + monkeypatch.setattr(machines.time, "sleep", lambda *_: None) + + args = parse_argv(["self-test", "machine", "46368", "--raw"]) + result = args.func(args) + + assert result["success"] is False + assert result["failure_code"] == "cleanup_failed" + assert result["stage"] == "cleanup" + assert "failed to destroy test instance 123" in result["reason"] + assert destroy_instance.call_count >= 10 + class TestListMachineEpilogDoesNotPromiseEmail: """Regression: epilogs must not reference the email that no longer carries details.""" @@ -808,6 +840,67 @@ def test_env_test_image_overrides_default_mapping( assert create.call_args.kwargs["image"] == "vastai/test:p3-env" assert create.call_args.kwargs["runtype"] == "ssh_direc ssh_proxy" + def test_status_poll_timeout_reports_status_poll_failure( + self, parse_argv, patch_get_client, monkeypatch + ): + offer = _self_test_offer() + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.instances_api.create_instance", + Mock(return_value={"new_contract": 123}), + ) + show = Mock( + side_effect=[ + RuntimeError("status API unavailable"), + {"id": 123, "actual_status": "running", "intended_status": "running"}, + ] + ) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.show_instance", show) + destroy = Mock(return_value={"success": True}) + monkeypatch.setattr("vastai.cli.commands.machines.instances_api.destroy_instance", destroy) + monkeypatch.setattr("vastai.cli.commands.machines.time.sleep", lambda *_: None) + monkeypatch.setattr( + "vastai.cli.commands.machines.time.time", + Mock(side_effect=[0, 0, 901]), + ) + + args = parse_argv(["self-test", "machine", "42", "--raw"]) + result = args.func(args) + + assert result["success"] is False + assert result["failure_code"] == "instance_status_poll_failed" + assert result["stage"] == "startup" + assert "status API unavailable" in result["reason"] + assert result["failure"]["underlying_error"] == "RuntimeError: status API unavailable" + destroy.assert_called_once() + + def test_unexpected_self_test_exception_is_structured_and_redacted( + self, parse_argv, patch_get_client, monkeypatch + ): + offer = _self_test_offer() + monkeypatch.setattr( + "vastai.cli.commands.machines.offers_api.search_offers", + Mock(return_value=[offer]), + ) + monkeypatch.setattr( + "vastai.cli.commands.machines.preflight_requirement_checks", + Mock(side_effect=RuntimeError("boom https://console.vast.ai/?api_key=secret")), + ) + + args = parse_argv(["self-test", "machine", "42", "--raw"]) + result = args.func(args) + + assert result["success"] is False + assert result["failure_code"] == "unexpected_error" + assert result["failure"]["code"] == "unexpected_error" + assert result["diagnostics"]["runtime_failure"]["code"] == "unexpected_error" + assert result["failure"]["stage"] == "preflight_checks" + assert "api_key=secret" not in result["reason"] + assert "api_key=REDACTED" in result["reason"] + def test_default_cuda_mapping_still_selects_official_image( self, parse_argv, patch_get_client, monkeypatch ): diff --git a/tests/cli/test_runtime_diagnostics.py b/tests/cli/test_runtime_diagnostics.py index aa9e4e69..4303ef58 100644 --- a/tests/cli/test_runtime_diagnostics.py +++ b/tests/cli/test_runtime_diagnostics.py @@ -74,6 +74,26 @@ def test_legacy_parser_tracks_stage_and_classifies_nccl_error(): assert result["underlying_error"] == "ERROR: NCCL unhandled system error during allreduce" +@pytest.mark.parametrize( + ("line", "stage"), + [ + ("Running ResNet18 test on all GPUs...", diag.STAGE_RESNET), + ("Running ECC test on all GPUs...", diag.STAGE_ECC), + ("Running NCCL distributed test with 2 GPUs...", diag.STAGE_NCCL), + ( + "Running stress-ng and gpu-burn tests simultaneously for 60 seconds...", + diag.STAGE_STRESS_GPU_BURN, + ), + ], +) +def test_legacy_parser_tracks_current_self_test_image_stage_lines(line, stage): + parser = diag.LegacyProgressParser() + + assert parser.process_line(line) is None + + assert parser.stage == stage + + def test_legacy_parser_classifies_unknown_error_as_legacy_progress_error(): result = diag.parse_legacy_progress( "\n".join( diff --git a/vastai/cli/commands/machines.py b/vastai/cli/commands/machines.py index f6c073d8..3ae97596 100644 --- a/vastai/cli/commands/machines.py +++ b/vastai/cli/commands/machines.py @@ -35,11 +35,13 @@ requirement_failure, ) from vastai.cli.self_test.runtime_diagnostics import ( + CLEANUP_FAILED, DAEMON_STARTUP_FAILED, INSTANCE_CREATE_FAILED, INSTANCE_CREATE_MISSING_CONTRACT, INSTANCE_OFFLINE_BEFORE_TEST, INSTANCE_START_TIMEOUT, + INSTANCE_STATUS_POLL_FAILED, INTERRUPTED, LegacyProgressParser, MISSING_PUBLIC_IP, @@ -52,6 +54,7 @@ UDP_CONTAINER_PORT, UDP_PORT_NOT_MAPPED, UDP_PROBE_FAILED, + UNEXPECTED_ERROR, classify_status_msg, make_progress_endpoint_diagnostic, make_failure, @@ -1075,6 +1078,7 @@ def selected_offer_for_self_test(machine_id): return finish_failure() result["offer"] = compact_offer_metadata(selected_offer) + result["stage"] = "preflight_checks" checks = preflight_requirement_checks(selected_offer) result["checks"] = checks unmet_checks = failed_checks(checks) @@ -1297,11 +1301,15 @@ def destroy_instance_silent(inst_id, collect_logs=False): def wait_for_instance(inst_id, timeout=900, interval=15): start_time = time.time() wait_started_at = None + successful_status_poll = False + last_poll_error = None + last_poll_error_type = None debug_print("Starting wait_for_instance with ID:", inst_id) while time.time() - start_time < timeout: try: instance_info = instances_api.show_instance(client, id=inst_id) + successful_status_poll = True if not instance_info: progress_print(f"No information returned for instance {inst_id}. Retrying...") time.sleep(interval) @@ -1399,17 +1407,33 @@ def wait_for_instance(inst_id, timeout=900, interval=15): except Exception as e: error = safe_error(e) + last_poll_error = error + last_poll_error_type = e.__class__.__name__ progress_print(f"Error retrieving instance info for {inst_id}: {error}. Retrying...") debug_print(f"Exception details: {error}") time.sleep(interval) - reason = f"Instance did not become running within {timeout} seconds. Verify network configuration. Use the self-test machine function in vast cli" + if not successful_status_poll and last_poll_error: + reason = f"Could not poll instance status within {timeout} seconds: {last_poll_error}" + diagnostic = make_failure( + INSTANCE_STATUS_POLL_FAILED, + stage="startup", + details=reason, + error=last_poll_error, + underlying_error=f"{last_poll_error_type}: {last_poll_error}", + ) + else: + reason = ( + f"Instance did not become running within {timeout} seconds. " + "Check host capacity, Docker/container startup, and network or port configuration." + ) + diagnostic = make_failure( + INSTANCE_START_TIMEOUT, + stage="startup", + details=reason, + ) progress_print(reason) - return False, reason, make_failure( - INSTANCE_START_TIMEOUT, - stage="startup", - details=reason, - ) + return False, reason, diagnostic # ----- run machine tester ----- def run_machinetester(ip_address, port, udp_port, inst_id, machine_id, delay, mapped_ports=None): @@ -1474,8 +1498,8 @@ def is_instance(iid): if status == 'offline': reason = "Instance offline during testing" progress_print(f"Instance {inst_id} went offline. {reason}") - destroy_instance_silent(inst_id, collect_logs=True) - instance_destroyed = True + cleanup = destroy_instance_silent(inst_id, collect_logs=True) + instance_destroyed = bool(cleanup.get("success")) return False, reason, make_failure(INSTANCE_OFFLINE_BEFORE_TEST, stage="runtime") try: @@ -1515,8 +1539,8 @@ def is_instance(iid): progress_print(" 1. UDP firewall/NAT forwarding is blocking the mapped public port") progress_print(" 2. TCP forwarding works, but UDP forwarding was not configured symmetrically") progress_print(" 3. Router/provider rules, CGNAT, or NAT hairpinning are blocking UDP") - destroy_instance_silent(inst_id, collect_logs=True) - instance_destroyed = True + cleanup = destroy_instance_silent(inst_id, collect_logs=True) + instance_destroyed = bool(cleanup.get("success")) return_reason = "TCP progress endpoint was reachable, but UDP echo probe failed" return False, return_reason, make_failure( UDP_PROBE_FAILED, @@ -1549,14 +1573,22 @@ def is_instance(iid): if not udp_probe_completed: progress_print("WARNING: Test reached DONE before UDP probe completion was recorded.") progress_print("Test completed successfully.") - destroy_instance_silent(inst_id) - instance_destroyed = True + cleanup = destroy_instance_silent(inst_id) + instance_destroyed = bool(cleanup.get("success")) + if not instance_destroyed: + reason = f"Test completed, but failed to destroy test instance {inst_id}." + return False, reason, make_failure( + CLEANUP_FAILED, + stage="cleanup", + details=reason, + error=cleanup.get("error"), + ) return True, "", None elif line.startswith('ERROR'): progress_print(line) progress_print(f"Test failed with error: {line}.") - destroy_instance_silent(inst_id, collect_logs=True) - instance_destroyed = True + cleanup = destroy_instance_silent(inst_id, collect_logs=True) + instance_destroyed = bool(cleanup.get("success")) return False, line, diagnostic else: progress_print(line) @@ -1627,8 +1659,8 @@ def is_instance(iid): details=return_reason, progress_endpoint=endpoint, ) - destroy_instance_silent(inst_id, collect_logs=True) - instance_destroyed = True + cleanup = destroy_instance_silent(inst_id, collect_logs=True) + instance_destroyed = bool(cleanup.get("success")) return False, return_reason, diagnostic debug_print("Waiting for 20 seconds before the next check.") @@ -1732,13 +1764,16 @@ def is_instance(iid): except Exception as e: error = safe_error(e) result["success"] = False - result["reason"] = error - result["failure_code"] = "unexpected_error" - result["failure"] = { - "code": "unexpected_error", - "summary": error, - "remediation": "Retry with --debugging and inspect the error details.", - } + set_runtime_failure( + make_failure( + UNEXPECTED_ERROR, + stage=result.get("stage"), + details=error, + error=error, + underlying_error=error, + ), + error, + ) result["error"] = error finally: diff --git a/vastai/cli/self_test/runtime_diagnostics.py b/vastai/cli/self_test/runtime_diagnostics.py index 9ea0f165..a5d6aed1 100644 --- a/vastai/cli/self_test/runtime_diagnostics.py +++ b/vastai/cli/self_test/runtime_diagnostics.py @@ -36,6 +36,7 @@ STRESS_GPU_BURN_FAILED = "stress_gpu_burn_failed" INTERRUPTED = "interrupted" CLEANUP_FAILED = "cleanup_failed" +UNEXPECTED_ERROR = "unexpected_error" PROGRESS_CONTAINER_PORT = "5000/tcp" UDP_CONTAINER_PORT = "5001/udp" @@ -65,6 +66,7 @@ STRESS_GPU_BURN_FAILED, INTERRUPTED, CLEANUP_FAILED, + UNEXPECTED_ERROR, ) @@ -109,7 +111,7 @@ class FailureCatalogEntry: ), INSTANCE_OFFLINE_BEFORE_TEST: FailureCatalogEntry( INSTANCE_OFFLINE_BEFORE_TEST, - "The instance went offline before the runtime test could run.", + "The instance went offline before or during the runtime test.", "Investigate host availability and instance lifecycle events.", ("Check machine status.", "Review host daemon and container logs."), ), @@ -236,6 +238,12 @@ class FailureCatalogEntry: "Destroy the temporary test instance manually to avoid continued billing.", ("Run destroy instance for the temporary contract.", "Retry cleanup after checking API connectivity."), ), + UNEXPECTED_ERROR: FailureCatalogEntry( + UNEXPECTED_ERROR, + "The self-test command hit an unexpected CLI error.", + "Retry with --debugging and inspect the support bundle or terminal output.", + ("Retry with --debugging enabled.", "Share the support bundle with Vast support if the error repeats."), + ), } @@ -249,10 +257,10 @@ class FailureCatalogEntry: _STAGE_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = ( (re.compile(r"^\s*Running system requirements test\.\.\.\s*$", re.IGNORECASE), STAGE_SYSTEM_REQUIREMENTS), - (re.compile(r"^\s*Running ResNet50/ResNet18 test\.\.\.\s*$", re.IGNORECASE), STAGE_RESNET), - (re.compile(r"^\s*Running ECC test\.\.\.\s*$", re.IGNORECASE), STAGE_ECC), - (re.compile(r"^\s*Running NCCL distributed test\.\.\.\s*$", re.IGNORECASE), STAGE_NCCL), - (re.compile(r"^\s*Running stress-ng and gpu-burn\.\.\.\s*$", re.IGNORECASE), STAGE_STRESS_GPU_BURN), + (re.compile(r"^\s*Running ResNet(?:50/ResNet18|18)(?: test(?: on all GPUs)?)?\.\.\.\s*$", re.IGNORECASE), STAGE_RESNET), + (re.compile(r"^\s*Running ECC test(?: on all GPUs)?\.\.\.\s*$", re.IGNORECASE), STAGE_ECC), + (re.compile(r"^\s*Running NCCL distributed test(?: with \d+ GPUs)?\.\.\.\s*$", re.IGNORECASE), STAGE_NCCL), + (re.compile(r"^\s*Running stress-ng and gpu-burn(?: tests simultaneously for \d+ seconds)?\.\.\.\s*$", re.IGNORECASE), STAGE_STRESS_GPU_BURN), ) _NVML_RE = re.compile( @@ -543,6 +551,7 @@ def classify_status_msg(status_msg: str | None) -> dict[str, object] | None: "UDP_CONTAINER_PORT", "UDP_PORT_NOT_MAPPED", "UDP_PROBE_FAILED", + "UNEXPECTED_ERROR", "classify_legacy_error_line", "classify_status_msg", "failure_catalog",