diff --git a/gcsfuse-micro-benchmarking/helpers/bucket.py b/gcsfuse-micro-benchmarking/helpers/bucket.py index c2304fd..a3796e5 100644 --- a/gcsfuse-micro-benchmarking/helpers/bucket.py +++ b/gcsfuse-micro-benchmarking/helpers/bucket.py @@ -1,35 +1,16 @@ import subprocess import shlex -import sys -import os -def create_gcs_bucket(location, project,config): - """ - Creates a GCS bucket using the gcloud storage CLI based on a configuration dictionary. - - Args: - config (dict): A dictionary with bucket configuration. - e.g., {'bucket_name': 'my-bucket', 'location': 'us-central1', ...} - - Raises: - ValueError: If a required configuration field is missing. - RuntimeError: If the gcloud command fails to execute. - """ - # 1. Validate required fields +def create_gcs_bucket(location, project, config): bucket_name = config.get('bucket_name') - if not bucket_name: raise ValueError("Bucket configuration failed: 'bucket_name' is missing.") if not location: raise ValueError("Bucket configuration failed: 'location' is missing.") - # 2. Build the base gcloud command cmd = [ 'gcloud', 'storage', 'buckets', 'create', f'gs://{bucket_name}', f'--project={project}', '--quiet' ] - - # Add optional flags if they are present in the config - # GCS uses 'location' for region, multi-region, or dual-region. cmd.append(f'--location={shlex.quote(location)}') if config.get('storage_class'): @@ -43,48 +24,30 @@ def create_gcs_bucket(location, project,config): cmd.append(f'--placement={shlex.quote(config["placement"])}') try: - # 3. Execute the command - # print(f"Executing command: {' '.join(cmd)}") - result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(f"Bucket '{bucket_name}' created successfully.") print(result.stdout) - except FileNotFoundError: raise RuntimeError("The 'gcloud' command was not found. Please ensure Google Cloud SDK is installed and configured in your system's PATH.") - except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to create bucket: {e.stderr}") except Exception as e: - raise RuntimeError(f"An unexpected error occurred: {e}") + raise RuntimeError(f"Failed to create bucket. An unexpected error occurred: {e}") return True def delete_gcs_bucket(bucket_name, project): - """ - Deletes a Google Cloud Storage bucket using the gcloud CLI. - - This function requires the gcloud CLI to be installed and authenticated. - - Args: - bucket_name (str): The name of the bucket to delete. - """ try: - # Construct the gcloud command. The --quiet flag suppresses - # the confirmation prompt. command = [ 'gcloud', '--quiet', 'storage', 'rm', '--recursive', f'gs://{bucket_name}', f'--project={project}' ] - # Run the command and capture the output - result = subprocess.run( + subprocess.run( command, - check=True, # This will raise an exception if the command fails + check=True, capture_output=True, text=True ) - print(f"Bucket '{bucket_name}' deleted successfully.") except subprocess.CalledProcessError as e: @@ -92,32 +55,4 @@ def delete_gcs_bucket(bucket_name, project): print("Error Output:", e.stderr) print("Return Code:", e.returncode) except FileNotFoundError: - print("Error: The 'gcloud' command was not found. Please ensure the gcloud CLI is installed and in your system's PATH.") - - -# Example Usage: -if __name__ == '__main__': - # Assuming 'is_zonal' and 'benchmark_id' are defined elsewhere - is_zonal = False - benchmark_id = 'my-benchmark-id' - location = 'us-central1' - default_zone = 'us-central1-a' - default_enable_hns = False - project='my-project' - - config = { - 'bucket_name': f"{benchmark_id}-bucket", - 'placement': default_zone if is_zonal else "", - 'storage_class': "RAPID" if is_zonal else "", - 'enable_hns': True if is_zonal else default_enable_hns, - } - - try: - create_gcs_bucket(location, project, config) - except (ValueError, RuntimeError) as e: - print(f"Error: {e}") - - try: - delete_gcs_bucket(config.get('bucket_name'), project) - except Exception as e: - print(f"Could not delete the GCS bucket, Error: {e}") \ No newline at end of file + print("Error: The 'gcloud' command was not found. Please ensure the gcloud CLI is installed and in your system's PATH.") \ No newline at end of file diff --git a/gcsfuse-micro-benchmarking/helpers/environment.py b/gcsfuse-micro-benchmarking/helpers/environment.py index d4e3d9c..974e179 100644 --- a/gcsfuse-micro-benchmarking/helpers/environment.py +++ b/gcsfuse-micro-benchmarking/helpers/environment.py @@ -2,38 +2,16 @@ import shlex import os import time -import json import requests from .constants import * - def contruct_metadata_string_from_config(metadata_config): - """ - Constructs a metadata string from a dictionary for gcloud. - - Args: - metadata_config (dict): A dictionary of key-value pairs. - - Returns: - str: A comma-separated string of key-value pairs. - """ metadata_items = [] for key, value in metadata_config.items(): - # Ensure the value is a string, which is required for gcloud metadata metadata_items.append(f"{key}={str(value)}") return ','.join(metadata_items) - - def create_vm_if_not_exists(vm_details, zone, project): - """ - Creates a VM if it does not already exist. - - Returns: - True: If the VM was newly created in this call. - False: If the VM already existed. - None: If an error occurred. - """ vm_name = vm_details.get('vm_name') if not vm_name: print("Error: 'vm_name' is a required key in vm_details.") @@ -48,7 +26,7 @@ def create_vm_if_not_exists(vm_details, zone, project): try: subprocess.run(check_cmd, check=True, capture_output=True, text=True, timeout=30) print(f"VM '{vm_name}' already exists in zone '{zone}'.") - return False # Already exists + return False except subprocess.CalledProcessError: print(f"VM '{vm_name}' does not exist in zone '{zone}'. Attempting to create...") @@ -66,34 +44,20 @@ def create_vm_if_not_exists(vm_details, zone, project): cmd_list.append(f'--service-account={vm_details["service_account"]}') try: - # print(f"Executing creation command: {' '.join(shlex.quote(arg) for arg in cmd_list)}") - subprocess.run(cmd_list, check=True, capture_output=True, text=True, timeout=360) # Increased timeout + subprocess.run(cmd_list, check=True, capture_output=True, text=True, timeout=360) print(f"VM '{vm_name}' creation command finished successfully.") - return True # Newly Created - except subprocess.CalledProcessError as e: - print(f"Error creating VM {vm_name}: {e}") - print(f"STDERR: {e.stderr}") - return None # Error - except subprocess.TimeoutExpired: - print(f"Timeout creating VM {vm_name}.") - return None # Error - except Exception as e: + return True + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + return None + except Exception: print(f"Unexpected error checking VM existence: {e}") return None def is_running_on_gce(): - """ - Checks if the script is running on a GCE VM *on the same network*. - Returns False for Cloudtop/Corp workstations to force External IP/IAP usage. - """ - # 1. CRITICAL: Check for Corporate Environment (Cloudtop) - # Cloudtops reside on a different VPC and cannot use --internal-ip - # to reach project VMs. We detect them by their unique home directory path. if os.path.exists("/usr/local/google/home"): print("Debug: Detected Cloudtop environment. Forcing External/IAP connection.") return False - # 2. Standard GCE Metadata Check try: response = requests.get( "http://metadata.google.internal/computeMetadata/v1/instance/", @@ -105,12 +69,10 @@ def is_running_on_gce(): return False def wait_for_ssh(vm_name, zone, project, retries=15, delay=20): - """Tries to SSH into the VM until it succeeds or retries are exhausted.""" - ssh_cmd = [ 'gcloud', 'compute', 'ssh', vm_name, f'--zone={zone}', f'--project={project}', - '--quiet', # Suppress interactive prompts + '--quiet', ] if is_running_on_gce(): print("Detected environment: GCE VM. Using internal IP.") @@ -137,18 +99,7 @@ def wait_for_ssh(vm_name, zone, project, retries=15, delay=20): return False def update_vm_metadata_parameter(vm_name, zone, project,metadata_config): - """ - Updates the metadata of a Google Cloud VM instance. - - Args: - vm_name (str): The name of the VM instance. - zone (str): The zone where the VM instance is located. - metadata_config (dict): A dictionary of key-value pairs to set as metadata. - """ - # First, construct the metadata string metadata_string = contruct_metadata_string_from_config(metadata_config) - - # Construct the gcloud command command = ( f"gcloud compute instances add-metadata {vm_name} " f"--zone={zone} " @@ -157,13 +108,10 @@ def update_vm_metadata_parameter(vm_name, zone, project,metadata_config): ) try: - # Use shlex.split to safely parse the command command_list = shlex.split(command) - - # Run the command subprocess.run( command_list, - check=True, # Raises an exception on command failure + check=True, capture_output=True, text=True ) @@ -189,7 +137,6 @@ def run_script_remotely(vm_name, zone, project, startup_script, max_retries=max_ remote_script_path = f"/tmp/startup_script_{script_filename}" remote_script_path_quoted = shlex.quote(remote_script_path) - # Step 1: Upload the script scp_cmd = gcloud_base + ['scp', startup_script, f'{vm_name}:{remote_script_path}', f'--zone={zone}', f'--project={project}'] if is_running_on_gce(): print("Detected GCE environment for SCP. Adding --internal-ip.") @@ -211,7 +158,7 @@ def run_script_remotely(vm_name, zone, project, startup_script, max_retries=max_ time.sleep(retry_delay) else: print(f"Failed to upload script after {max_retries} retries.") - return False # Return False only after all retries are exhausted + return False except subprocess.TimeoutExpired as e: print(f"Timeout expired while uploading script on attempt {i+1}: {e}") if i < max_retries - 1: @@ -223,11 +170,8 @@ def run_script_remotely(vm_name, zone, project, startup_script, max_retries=max_ print(f"An unexpected error occurred during upload: {e}") return False else: - # This else block runs only if the for loop completes without a 'break' - # which means all retries failed. return False - # Step 2: Execute the script inside a detached tmux session ssh_base = gcloud_base + ['ssh', vm_name, f'--zone={zone}', f'--project={project}'] if is_running_on_gce(): print("Detected GCE environment for execution SSH. Adding --internal-ip.") @@ -235,7 +179,6 @@ def run_script_remotely(vm_name, zone, project, startup_script, max_retries=max_ tmux_session_name = f"startup_{vm_name}" tmux_session_name_quoted = shlex.quote(tmux_session_name) - # Command to install tmux non-interactively if not present (for Debian/Ubuntu) install_tmux_cmd = ( "if ! command -v tmux &> /dev/null; then " "echo 'tmux not found, attempting to install...'; " @@ -244,8 +187,6 @@ def run_script_remotely(vm_name, zone, project, startup_script, max_retries=max_ "fi" ) - # Command to kill existing session if it exists, then start a new one - # Export DEBIAN_FRONTEND to affect all commands in the shell remote_exec_cmd = ( f"export DEBIAN_FRONTEND=noninteractive && " f"{install_tmux_cmd} && " @@ -301,13 +242,12 @@ def startup_benchmark_vm(cfg, zone, project,metadata_config): creation_status = create_vm_if_not_exists(cfg, zone, project) - if creation_status is None: # Error during check/create + if creation_status is None: print(f"Failed to ensure VM {vm_name} exists.") return False - elif creation_status is True: # Newly created + elif creation_status is True: if not wait_for_ssh(vm_name, zone, project): return False - # If False, VM already existed, proceed. if not update_vm_metadata_parameter(vm_name, zone, project, metadata_config): return False @@ -317,15 +257,6 @@ def startup_benchmark_vm(cfg, zone, project,metadata_config): def delete_gce_vm(vm_name, zone, project): - """ - Deletes a GCE VM without a confirmation prompt. - - Args: - vm_name (str): The name of the VM to delete. - - Returns: - bool: True if the deletion was successful, False otherwise. - """ if not vm_name: print("Error: A VM name is required for deletion.") return False @@ -333,21 +264,16 @@ def delete_gce_vm(vm_name, zone, project): cmd_list = ['gcloud', 'compute', 'instances', 'delete', vm_name, f'--zone={zone}', f'--project={project}', '--quiet'] try: - # print(f"Executing command: {' '.join(shlex.quote(arg) for arg in cmd_list)}") - - # Use subprocess.run to execute the command subprocess.run(cmd_list, check=True, capture_output=True, text=True) print(f"VM '{vm_name}' deletion command executed successfully.") - - # Poll for deletion to confirm it's complete print("Waiting for VM to be fully deleted...") check_cmd_string = f'gcloud compute instances describe {vm_name} --format=value(name)' while True: try: subprocess.run(shlex.split(check_cmd_string), check=True, capture_output=True, text=True) - time.sleep(10) # Wait for 10 seconds before checking again + time.sleep(10) except subprocess.CalledProcessError: print(f"VM '{vm_name}' has been successfully deleted.") return True @@ -360,23 +286,3 @@ def delete_gce_vm(vm_name, zone, project): except FileNotFoundError: print("Error: 'gcloud' command not found. Please ensure Google Cloud SDK is installed and in your PATH.") return False - - -if __name__ == '__main__': - vm_config = { - 'vm_name': 'my-test-vm', - 'machine_type': 'e2-micro', - 'image_family': 'debian-11', - 'image_project': 'debian-cloud', - 'disk_size': '10GB', - 'startup_script': 'test_script.sh' - } - - # Create a dummy script for demonstration - with open('test_script.sh', 'w') as f: - f.write("#!/bin/bash\n") - f.write("echo 'Hello from the script!' > /tmp/hello.txt\n") - f.write("echo 'Script executed successfully.'\n") - f.write("touch /tmp/script_status.txt") - - delete_gce_vm(vm_config.get('vm_name'), default_zone, default_project) diff --git a/gcsfuse-micro-benchmarking/helpers/generate_report.py b/gcsfuse-micro-benchmarking/helpers/generate_report.py index 02e1c65..52cfff0 100644 --- a/gcsfuse-micro-benchmarking/helpers/generate_report.py +++ b/gcsfuse-micro-benchmarking/helpers/generate_report.py @@ -4,17 +4,7 @@ def pretty_print_metrics_table(metrics, output_file=None): - """ - Prints the metrics dictionary in a fancy table format to the console - and optionally appends it to a file. - - Args: - metrics: A dictionary where keys are test case identifiers - and values are dictionaries containing test case parameters - and nested 'fio_metrics', 'vm_metrics', etc. - output_file: (Optional) Path to a file where the table output - will be appended. - """ + if not metrics: print("Metrics dictionary is empty.") return @@ -101,71 +91,3 @@ def pretty_print_metrics_table(metrics, output_file=None): print(f"\nBenchmark results saved to: {output_file}") except Exception as e: print(f"\nError writing to file {output_file}: {e}") - - -# Example usage with your new metrics structure: -if __name__ == '__main__': - metrics = { - '4KB_1MB_1_read_1_1': { - 'fio_metrics': { - 'avg_read_throughput_kibps': 3172.0, - 'stdev_read_throughput_kibps': 63.6396, - 'avg_write_throughput_kibps': 0.0, - 'stdev_write_throughput_kibps': 0.0, - 'avg_read_latency_ns': 2705.68, - 'stdev_read_latency_ns': 225.296, - 'avg_write_latency_ns': 0.0, - 'stdev_write_latency_ns': 0.0, - 'avg_read_iops': 793.0, - 'stdev_read_iops': 15.9099 - }, - 'vm_metrics': { - 'avg_cpu_utilization_percent': 12.5, - 'stdev_cpu_utilization_percent': 1.1 - }, - 'cpu_percent_per_gbps': 0.12345, - 'bs': '4KB', - 'file_size': '1MB', - 'iodepth': '1', - 'iotype': 'read', - 'threads': '1', - 'nrfiles': '1' - }, - '8KB_2MB_2_write_1_1': { - 'fio_metrics': { - 'avg_read_throughput_kibps': 0.0, - 'stdev_read_throughput_kibps': 0.0, - 'avg_write_throughput_kibps': 5200.0, - 'stdev_write_throughput_kibps': 150.0, - 'avg_read_latency_ns': 0.0, - 'stdev_read_latency_ns': 0.0, - 'avg_write_latency_ns': 1800.0, - 'stdev_write_latency_ns': 100.0, - 'avg_write_iops': 650.0, - 'stdev_write_iops': 18.75 - }, - 'vm_metrics': { - 'avg_cpu_utilization_percent': 15.8, - 'stdev_cpu_utilization_percent': 2.3 - }, - 'cpu_percent_per_gbps': 0.23456, - 'bs': '8KB', - 'file_size': '2MB', - 'iodepth': '2', - 'iotype': 'write', - 'threads': '1', - 'nrfiles': '1' - } - } - - output_filename = "metrics_summary.txt" - # Clear the file for demonstration purposes - if os.path.exists(output_filename): - os.remove(output_filename) - - print("--- Metrics Table (Trial 1) ---") - pretty_print_metrics_table(metrics, output_file=output_filename) - - # Example of calling it again, appending to the same file - print("\n--- Metrics Table (Trial 2) ---") - pretty_print_metrics_table(metrics, output_file=output_filename) diff --git a/gcsfuse-micro-benchmarking/helpers/helper.py b/gcsfuse-micro-benchmarking/helpers/helper.py index 3ed7c86..a420b85 100644 --- a/gcsfuse-micro-benchmarking/helpers/helper.py +++ b/gcsfuse-micro-benchmarking/helpers/helper.py @@ -4,119 +4,48 @@ import yaml import csv import shutil -import warnings -import re import subprocess -import shlex import time +import itertools from datetime import datetime, timedelta from .constants import * from . import environment -from . import bucket - def generate_random_string(length): - """Generates a random string of fixed length.""" - characters = string.ascii_lowercase + string.digits - return ''.join(random.choice(characters) for i in range(length)) - + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def generate_artifacts_dir(benchmark_id: str) -> str | None: - """ - Creates a directory named benchmark_id inside /tmp. - - Args: - benchmark_id: The name of the directory to create. - - Returns: - The full path to the created directory, or None if creation failed. - """ - if not benchmark_id: - print("Error: benchmark_id cannot be empty.") - return None - - base_dir = '/tmp' - # Sanitize benchmark_id to avoid path traversal issues, though less critical in /tmp - # For example, ensure benchmark_id doesn't contain '..' - safe_benchmark_id = os.path.basename(benchmark_id) - if safe_benchmark_id != benchmark_id: - print(f"Warning: benchmark_id '{benchmark_id}' was sanitized to '{safe_benchmark_id}'") - # Depending on requirements, you might want to raise an error here - - path = os.path.join(base_dir, safe_benchmark_id) - + if not benchmark_id: return None + path = os.path.join('/tmp', os.path.basename(benchmark_id)) try: os.makedirs(path, exist_ok=True) print(f"Artifacts directory path: '{path}'") return path - except OSError as e: - print(f"Error creating directory '{path}': {e}") - return None except Exception as e: - print(f"An unexpected error occurred: {e}") + print(f"Error creating directory '{path}': {e}") return None def copy_to_artifacts_dir(artifacts_dir, oldpath, filename): - # Move file from oldpath to artifacts dir under new name try: shutil.copy(oldpath, os.path.join(artifacts_dir, filename)) - except FileNotFoundError : - print(f"Error: The file '{oldpath}' was not found.") - except Exception as e : - print(f"Error while moving the file: {e}") + except Exception as e: + print(f"Error moving file {oldpath}: {e}") return os.path.join(artifacts_dir, filename) def parse_bench_config(config_filepath): - with open(config_filepath, 'r') as file: - config = yaml.safe_load(file) - return config + with open(config_filepath, 'r') as f: return yaml.safe_load(f) def generate_fio_job_file(job_details): - """Generates a FIO job file based on the provided job details.""" - # Extract job details, checking for empty lists and falling back to defaults - bs_values = job_details.get('bs') - file_size_values = job_details.get('file_size') - iodepth_values = job_details.get('iodepth') - iotype_values = job_details.get('iotype') - threads_values = job_details.get('threads') - nrfiles_values = job_details.get('nrfiles') - - - # Generate combinations of parameters - job_configs = [] - for bs in bs_values: - for file_size in file_size_values: - for iodepth in iodepth_values: - for iotype in iotype_values: - for threads in threads_values: - for nrfiles in nrfiles_values: - job_configs.append({ - 'bs': bs, - 'file_size': file_size, - 'iodepth': iodepth, - 'iotype': iotype, - 'threads': threads, - 'nrfiles': nrfiles, - }) - - filepath = os.path.join("/tmp/fio_job_" + generate_random_string(10) + ".csv") - - with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=['bs', 'file_size', 'iodepth', 'iotype', 'threads','nrfiles'], quoting=csv.QUOTE_MINIMAL) - writer.writeheader() - for case in job_configs: - writer.writerow({ - 'bs': case['bs'], - 'file_size': case['file_size'], - 'iodepth': case['iodepth'], - 'iotype': case['iotype'], - 'threads': case['threads'], - 'nrfiles': case['nrfiles'], - }) - + keys = ['bs', 'file_size', 'iodepth', 'iotype', 'threads', 'nrfiles'] + vals = [job_details.get(k, []) for k in keys] + filepath = f"/tmp/fio_job_{generate_random_string(10)}.csv" + with open(filepath, 'w', newline='') as f: + w = csv.writer(f) + w.writerow(keys) + w.writerows(itertools.product(*vals)) return filepath @@ -130,108 +59,48 @@ def get_jobcases_file(artifacts_dir, config): filepath = copy_to_artifacts_dir(artifacts_dir, filepath, "fio_job_cases.csv") return filepath +def _get_config_file(artifacts_dir, config, key, default, dest): + path = config.get(key) + if not path or not os.path.exists(path): path = default + return copy_to_artifacts_dir(artifacts_dir, path, dest) def get_job_template(artifacts_dir, config): - config_path=config.get('fio_jobfile_template') - if not os.path.exists(config_path): - print("The specified fio jobfile template does not exist. Proceeding with default") - config_path = "./resources/jobfile.fio" - # Move to the artifacts_dir - new_filepath = copy_to_artifacts_dir(artifacts_dir, config_path, "jobfile.fio") - return new_filepath - + return _get_config_file(artifacts_dir, config, 'fio_jobfile_template', "./resources/jobfile.fio", "jobfile.fio") def get_gcsfuse_mount_config(artifacts_dir, config): - config_path=config.get('mount_config_file') - if not os.path.exists(config_path): - print("The specified mount config file does not exist. Proceeding with default") - config_path = "./resources/mount_config.yml" - # Move to the artifacts_dir - new_filepath = copy_to_artifacts_dir(artifacts_dir, config_path, "mount_config.yml") - return new_filepath - + return _get_config_file(artifacts_dir, config, 'mount_config_file', "./resources/mount_config.yml", "mount_config.yml") def get_version_details(artifacts_dir, config): - filepath="/tmp/version_details.yml" - version_details = config.get('version_details') - with open(filepath, 'w') as file: - file.write(f"go_version: {version_details.get('go_version')}\n") - file.write(f"fio_version: {version_details.get('fio_version')}\n") - file.write(f"gcsfuse_version_or_commit: {version_details.get('gcsfuse_version_or_commit')}\n") - # Move to the artifacts_dir - new_filepath = copy_to_artifacts_dir(artifacts_dir, filepath, "version_details.yml") - return new_filepath + filepath = "/tmp/version_details.yml" + version_details = config.get('version_details', {}) + with open(filepath, 'w') as f: + for k in ['go_version', 'fio_version', 'gcsfuse_version_or_commit']: + f.write(f"{k}: {version_details.get(k)}\n") + return copy_to_artifacts_dir(artifacts_dir, filepath, "version_details.yml") def generate_benchmarking_resources(artifacts_dir, cfg): - fio_jobcases_filepath= get_jobcases_file(artifacts_dir, cfg) - print(f"Generated testcases for benchmarking at : {fio_jobcases_filepath}") - - fio_job_template = get_job_template(artifacts_dir, cfg) - print(f"Generated job template for benchmarking at : {fio_job_template}") - - mount_config = get_gcsfuse_mount_config(artifacts_dir, cfg) - print(f"Generated mount config for benchmarking at : {mount_config}") - - version_details = get_version_details(artifacts_dir, cfg) - print(f"Generated version details for benchmarking at : {version_details}") + print(f"Testcases: {get_jobcases_file(artifacts_dir, cfg)}") + print(f"Job template: {get_job_template(artifacts_dir, cfg)}") + print(f"Mount config: {get_gcsfuse_mount_config(artifacts_dir, cfg)}") + print(f"Version details: {get_version_details(artifacts_dir, cfg)}") def create_benchmark_vm(cfg): - """ - Creates the GCE VM for benchmarking based on the provided configuration. - - Args: - cfg (dict): The benchmark configuration dictionary. - - Returns: - bool: True if the VM was created successfully, False otherwise. - """ - vm_details = cfg.get('bench_env').get('gce_env') print("--- Creating GCE VM for benchmarking ---") - success = environment.create_and_run_on_gce_vm(vm_details) - if not success: + if not environment.create_and_run_on_gce_vm(cfg.get('bench_env', {}).get('gce_env')): print("--- Failed to create GCE VM. ---") - return success + return False + return True def copy_directory_to_bucket(local_dir, bucket_name): - """ - Copies a local directory to a GCS bucket using the gcloud CLI. - - Args: - local_dir (str): The path to the local directory. - bucket_name (str): The name of the GCS bucket. - """ - if not os.path.isdir(local_dir): - print(f"Error: Local directory '{local_dir}' not found.") - return - + if not os.path.isdir(local_dir): return try: - # Construct the gcloud command. The --recursive flag is essential - # for copying entire directories. The destination is gs://bucket_name/local_dir_name. - command = f"gcloud storage cp --recursive {local_dir} gs://{bucket_name}/" - - # Use shlex.split to safely parse the command string into a list of arguments. - command_list = shlex.split(command) - - # Run the command and wait for it to complete. - # check=True will raise an exception if the command returns a non-zero exit code. - subprocess.run( - command_list, - check=True, - capture_output=True, - text=True - ) - + subprocess.run(['gcloud', 'storage', 'cp', '--recursive', local_dir, f'gs://{bucket_name}/'], check=True, capture_output=True, text=True) print(f"Directory '{local_dir}' copied successfully to gs://{bucket_name}/") - except subprocess.CalledProcessError as e: - print(f"Error copying directory '{local_dir}':") - print("Error Output:", e.stderr) - print("Return Code:", e.returncode) - except FileNotFoundError: - print("Error: The 'gcloud' command was not found. Please ensure the gcloud CLI is installed and in your system's PATH.") + print(f"Error copying directory: {e.stderr}") def construct_gcloud_path(bucket_name, bench_id): @@ -239,62 +108,21 @@ def construct_gcloud_path(bucket_name, bench_id): def wait_for_benchmark_to_complete(bucket_name, filepath, timeout=timeout, poll_interval=poll_interval): - """ - Waits for a benchmark to complete by polling for a success or failure file - using the gcloud CLI. - - Args: - bucket_name (str): The name of the GCS bucket to monitor. - timeout (int): The maximum time in seconds to wait. - poll_interval (int): The interval in seconds between each check. - - Returns: - int: 0 if a 'success.txt' file is found. - int: 1 if a 'failure.txt' file is found or the timeout is reached. - """ - print(f"Monitoring bucket '{bucket_name}' for benchmark completion...") - + print(f"Monitoring bucket '{bucket_name}'...") deadline = datetime.now() + timedelta(seconds=timeout) - while datetime.now() < deadline: - print(f"Polling for completion files at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}...") - - # Construct the gcloud command to list files in the bucket - command = f"gcloud storage ls {filepath}" - command_list = shlex.split(command) - try: - # Run the command and capture the output - result = subprocess.run( - command_list, - check=True, - capture_output=True, - text=True - ) - - # Check the command's standard output for the file names - if 'success.txt' in result.stdout: - print(f"Success! Found 'success.txt'. Benchmark completed successfully.") + res = subprocess.run(['gcloud', 'storage', 'ls', filepath], check=True, capture_output=True, text=True) + if 'success.txt' in res.stdout: + print("Benchmark completed successfully.") return True - if 'failure.txt' in result.stdout: + if 'failure.txt' in res.stdout: print(f"Failure! Found 'failure.txt'. Benchmark failed.") return False - - except subprocess.CalledProcessError as e: - # This handles cases where the gcloud command itself fails - print(f"Error during gcloud command: {e.stderr}") - # We can choose to exit or continue based on the error - # For this scenario, we'll continue, as the error might be - # due to an empty bucket, which is a valid state to be in - # while waiting. - pass - except FileNotFoundError: - print("Error: The 'gcloud' command was not found.") - return False # Exit with an error if gcloud isn't installed - + except subprocess.CalledProcessError: pass + except FileNotFoundError: return False time.sleep(poll_interval) - # If the loop completes, the timeout was reached print("Timeout reached. Neither success nor failure file was found.") return False diff --git a/gcsfuse-micro-benchmarking/helpers/parse_results.py b/gcsfuse-micro-benchmarking/helpers/parse_results.py index e60827f..d0c4d59 100644 --- a/gcsfuse-micro-benchmarking/helpers/parse_results.py +++ b/gcsfuse-micro-benchmarking/helpers/parse_results.py @@ -2,465 +2,101 @@ import subprocess import os import tempfile -import shlex import shutil import json import fnmatch import statistics import datetime -from datetime import timezone from .vm_metrics import get_vm_cpu_utilization_points def calculate_stats(data): - """Calculates mean and standard deviation, handling single-element lists.""" - if not data: - return None, None - if len(data) == 1: - return data[0], 0.0 - return statistics.fmean(data), statistics.stdev(data) + if not data: return None, None + return (data[0], 0.0) if len(data) == 1 else (statistics.fmean(data), statistics.stdev(data)) def process_fio_metrics_and_vm_metrics(fio_metrics, timestamps, vm_cfg): - """ - Processes FIO and VM metrics to generate a combined performance report. - - Args: - fio_metrics (list): A list of JSON objects, where each object is the result of one FIO iteration. - timestamps (list): A list of tuples, where each tuple contains the (start_time, end_time) for each FIO iteration. - vm_cfg (dict): The configuration dictionary for the VM. - """ - if len(fio_metrics) != len(timestamps): - print("Mismatch in the number of records for fio metrics and timestamps") - exit() - - # --- 1. Process FIO Metrics --- - # FIO throughput (bw) is in KiB/s. We will collect both read and write values. - read_bws = [ - job['read']['bw'] - for metrics in fio_metrics - for job in metrics['jobs'] - if 'read' in job and 'bw' in job['read'] - ] - - write_bws = [ - job['write']['bw'] - for metrics in fio_metrics - for job in metrics['jobs'] - if 'write' in job and 'bw' in job['write'] - ] - - # FIO latency (lat_ns) is in milliseconds. - read_lats = [ - job['read']['lat_ns']['mean']/1000000.0 - for metrics in fio_metrics - for job in metrics['jobs'] - if 'read' in job and 'lat_ns' in job['read'] - ] - - write_lats = [ - job['write']['lat_ns']['mean']/1000000.0 - for metrics in fio_metrics - for job in metrics['jobs'] - if 'write' in job and 'lat_ns' in job['write'] - ] - - read_iops = [ - job['read']['iops'] - for metrics in fio_metrics - for job in metrics['jobs'] - if 'read' in job and 'iops' in job['read'] - ] - - write_iops = [ - job['write']['iops'] - for metrics in fio_metrics - for job in metrics['jobs'] - if 'write' in job and 'iops' in job['write'] - ] - - # Calculate average and standard deviation for FIO metrics. - # Note: statistics.fmean and statistics.stdev are used for floating point data. - fio_report = {} - avg_read_bw, stdev_read_bw = calculate_stats(read_bws) - if avg_read_bw is not None: - # 1 MiB/s = 1024 KiB/s - fio_report['avg_read_throughput_mbps'] = avg_read_bw / 1000.0 - fio_report['stdev_read_throughput_mbps'] = stdev_read_bw / 1000.0 - - avg_write_bw, stdev_write_bw = calculate_stats(write_bws) - if avg_write_bw is not None: - # 1 MiB/s = 1024 KiB/s - fio_report['avg_write_throughput_mbps'] = avg_write_bw / 1000.0 - fio_report['stdev_write_throughput_mbps'] = stdev_write_bw / 1000.0 - - avg_read_lat, stdev_read_lat = calculate_stats(read_lats) - if avg_read_lat is not None: - fio_report['avg_read_latency_ms'] = avg_read_lat - fio_report['stdev_read_latency_ms'] = stdev_read_lat - - avg_write_lat, stdev_write_lat = calculate_stats(write_lats) - if avg_write_lat is not None: - fio_report['avg_write_latency_ms'] = avg_write_lat - fio_report['stdev_write_latency_ms'] = stdev_write_lat + if len(fio_metrics) != len(timestamps): print("Mismatch in metrics/timestamps"); exit() - avg_read_iops, stdev_read_iops = calculate_stats(read_iops) - if avg_read_iops is not None: - fio_report['avg_read_iops'] = avg_read_iops - fio_report['stdev_read_iops'] = stdev_read_iops - avg_write_iops, stdev_write_iops = calculate_stats(write_iops) - if avg_write_iops is not None: - fio_report['avg_write_iops'] = avg_write_iops - fio_report['stdev_write_iops'] = stdev_write_iops - - # --- 2. Process VM Metrics --- - vm_name = vm_cfg['instance_name'] - project = vm_cfg['project'] - zone = vm_cfg['zone'] - - all_cpu_utilizations = [] - - for i, ts in enumerate(timestamps): - start_time = datetime.datetime.strptime(ts['start_time'], "%Y-%m-%dT%H:%M:%S%z") - end_time = datetime.datetime.strptime(ts['end_time'], "%Y-%m-%dT%H:%M:%S%z") + def get_vals(d, m, scale=1.0): + return [job[d][m]/scale for x in fio_metrics for job in x['jobs'] if d in job and m in job[d]] + + rep = {} + for d in ['read', 'write']: + for m, k, s in [('bw', 'throughput_mbps', 1000.0), ('lat_ns', 'latency_ms', 1e6), ('iops', 'iops', 1.0)]: + avg, std = calculate_stats(get_vals(d, m, s)) + if avg is not None: rep[f'avg_{d}_{k}'], rep[f'stdev_{d}_{k}'] = avg, std + + vm_name, proj, zone = vm_cfg['instance_name'], vm_cfg['project'], vm_cfg['zone'] + cpus = [] + for ts in timestamps: try: - print(f"Fetching CPU for interval {i+1}: {start_time} to {end_time}") - cpu_points = get_vm_cpu_utilization_points(vm_name, project, zone, start_time, end_time) - if cpu_points: - avg_cpu_for_interval = max(cpu_points) - all_cpu_utilizations.append(avg_cpu_for_interval) - # print(f" Interval {i+1}: Found {len(cpu_points)} CPU points, Avg: {avg_cpu_for_interval:.4f}") - else: - print(f" Interval {i+1}: No CPU data points found.") - except Exception as e: - print(f"Error fetching VM metrics for interval {start_time} to {end_time}: {e}") - continue - - vm_report = {} - avg_cpu, stdev_cpu = calculate_stats(all_cpu_utilizations) - if avg_cpu is not None: - vm_report['avg_cpu_utilization_percent'] = avg_cpu * 100 - vm_report['stdev_cpu_utilization_percent'] = stdev_cpu * 100 - vm_report['cpu_data_point_count'] = len(all_cpu_utilizations) - else: - vm_report['avg_cpu_utilization_percent'] = None - vm_report['stdev_cpu_utilization_percent'] = None - vm_report['cpu_data_point_count'] = 0 - - # --- 3. Generate Final Combined Report --- - final_report = { - 'fio_metrics': fio_report, - 'vm_metrics': vm_report - } + st = datetime.datetime.strptime(ts['start_time'], "%Y-%m-%dT%H:%M:%S%z") + et = datetime.datetime.strptime(ts['end_time'], "%Y-%m-%dT%H:%M:%S%z") + pts = get_vm_cpu_utilization_points(vm_name, proj, zone, st, et) + if pts: cpus.append(max(pts)) + except Exception as e: print(f"VM metrics error: {e}") + + avg_cpu, std_cpu = calculate_stats(cpus) + vm_rep = {'avg_cpu_utilization_percent': avg_cpu * 100 if avg_cpu else None, + 'stdev_cpu_utilization_percent': std_cpu * 100 if std_cpu else None, + 'cpu_data_point_count': len(cpus)} + + final = {'fio_metrics': rep, 'vm_metrics': vm_rep} + total_mbps = rep.get('avg_read_throughput_mbps', 0) + rep.get('avg_write_throughput_mbps', 0) + avg_cpu_pct = vm_rep.get('avg_cpu_utilization_percent') - # Calculate the CPU% per GB/s metric. - # We combine read and write throughput and convert from KiB/s to GB/s. - total_avg_throughput_mbps = ( - fio_report.get('avg_read_throughput_mbps', 0) + - fio_report.get('avg_write_throughput_mbps', 0) - ) - - # Get the average CPU percent, will be None if not calculated - avg_cpu_percent = vm_report.get('avg_cpu_utilization_percent') - - # Check if both throughput and avg_cpu_percent are valid for calculation - if total_avg_throughput_mbps > 0 and avg_cpu_percent is not None: - # Convert MiB/s to decimal GB/s (Gigabytes per second) - # 1 MiB = 1024 * 1024 Bytes - # 1 GB = 1000 * 1000 * 1000 Bytes - bytes_per_mb = 1000.0 * 1000.0 - bytes_per_gb = 1000.0 * 1000.0 * 1000.0 - - total_avg_throughput_gbps = total_avg_throughput_mbps * bytes_per_mb / bytes_per_gb - - if total_avg_throughput_gbps > 1e-9: # Avoid division by zero or near-zero - final_report['cpu_percent_per_gbps'] = avg_cpu_percent / total_avg_throughput_gbps - else: - # Handle cases with very low or zero throughput - final_report['cpu_percent_per_gbps'] = float('inf') - else: - final_report['cpu_percent_per_gbps'] = None - if avg_cpu_percent is None: - print("Cannot calculate cpu_percent_per_gbps: Average CPU utilization is not available.") - if total_avg_throughput_mbps <= 0: - print("Cannot calculate cpu_percent_per_gbps: Total average throughput is zero or less.") - - return final_report + final['cpu_percent_per_gbps'] = None + if total_mbps > 0 and avg_cpu_pct is not None: + gbps = total_mbps / 1000.0 + final['cpu_percent_per_gbps'] = (avg_cpu_pct / gbps) if gbps > 1e-9 else float('inf') + return final def download_artifacts_from_bucket(benchmark_id: str, artifacts_bucket: str): - """ - Downloads artifacts for a given benchmark_id from a GCS bucket. - - Uses 'gcloud storage cp -r' to copy the folder named benchmark_id - from gs://{artifacts_bucket}/ to a local temporary directory. - - Args: - benchmark_id: The name of the folder within the bucket to download. - artifacts_bucket: The GCS bucket name. - - Returns: - The absolute local file path to the downloaded folder - (e.g., /tmp/tmpxyz/{benchmark_id}). - The caller is responsible for cleaning up this temporary directory - when no longer needed (e.g., using shutil.rmtree()). - - Raises: - subprocess.CalledProcessError: If the gcloud command fails. - FileNotFoundError: If the downloaded folder is not found after copy. - """ - if not benchmark_id: - raise ValueError("benchmark_id cannot be empty") - if not artifacts_bucket: - raise ValueError("artifacts_bucket cannot be empty") - - # Create a unique temporary directory - # tempfile.mkdtemp() creates a new directory with a unique name. - local_temp_base_dir = tempfile.mkdtemp() - - source_uri = f"gs://{artifacts_bucket}/{benchmark_id}" - - # Command to recursively copy the GCS folder to the local temp directory. - # The benchmark_id folder itself will be created inside local_temp_base_dir. - command = [ - "gcloud", "storage", "cp", - "-r", # Recursive - source_uri, - local_temp_base_dir - ] - - # print(f"Running command: {' '.join(shlex.quote(arg) for arg in command)}") - + if not benchmark_id or not artifacts_bucket: raise ValueError("Missing ID or bucket") + tmp = tempfile.mkdtemp() try: - result = subprocess.run( - command, - check=True, # Raise an exception for non-zero exit codes - capture_output=True, - text=True - ) - # print("gcloud stdout:", result.stdout) - # print("gcloud stderr:", result.stderr) - - # The downloaded folder will be at local_temp_base_dir/benchmark_id - downloaded_folder_path = os.path.join(local_temp_base_dir, benchmark_id) - - if not os.path.isdir(downloaded_folder_path): - # Cleanup the base temp dir if the expected subfolder wasn't created. - shutil.rmtree(local_temp_base_dir) - raise FileNotFoundError( - f"Expected downloaded folder not found at: {downloaded_folder_path}" - f" after running gcloud command. Check stderr for details." - ) - - print(f"Artifacts downloaded to: {downloaded_folder_path}") - return downloaded_folder_path - - except subprocess.CalledProcessError as e: - print(f"gcloud storage cp command failed:") - print(f" Return Code: {e.returncode}") - print(f" Stderr: {e.stderr}") - print(f" Stdout: {e.stdout}") - # Clean up the potentially partially created temp directory - shutil.rmtree(local_temp_base_dir) - raise - except Exception as e: - # Clean up in case of other errors - shutil.rmtree(local_temp_base_dir) - raise - - -def load_csv_to_object(filepath): - """Loads a CSV file into a list of dictionaries.""" - data = [] - with open(filepath, 'r', newline='') as csvfile: - # Use csv.DictReader to automatically map rows to dictionaries - reader = csv.DictReader(csvfile) - for row in reader: - data.append(row) - return data + subprocess.run(["gcloud", "storage", "cp", "-r", f"gs://{artifacts_bucket}/{benchmark_id}", tmp], check=True, capture_output=True, text=True) + path = os.path.join(tmp, benchmark_id) + if not os.path.isdir(path): raise FileNotFoundError(f"Folder not found: {path}") + print(f"Artifacts downloaded to: {path}") + return path + except Exception: shutil.rmtree(tmp); raise def clean_load_json_to_object(filepath: str): - """ - Loads a JSON file into a Python object, skipping any leading - lines that are not part of the JSON content. - - It looks for the first line that, after stripping leading whitespace, - starts with '{' or '['. - - Args: - filepath: The full path to the JSON file. - - Returns: - A Python object (dict, list, etc.) representing the JSON content. - Returns None if the file is not found, no valid JSON start - is found, or if JSON decoding fails. - """ - if not os.path.exists(filepath): - print(f"Error: File not found: {filepath}") - return None - + if not os.path.exists(filepath): return None try: - with open(filepath, 'r') as jsonfile: - lines = jsonfile.readlines() - - json_start_line_index = -1 + with open(filepath, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): - stripped_line = line.lstrip() - if stripped_line.startswith('{') or stripped_line.startswith('['): - json_start_line_index = i - break - - if json_start_line_index != -1: - # Join the lines from the start of the JSON content to the end - json_string = "".join(lines[json_start_line_index:]) - try: - data = json.loads(json_string) - return data - except json.JSONDecodeError as e: - print(f"Error decoding JSON from {filepath} starting from line {json_start_line_index + 1}: {e}") - # Optionally print the snippet that failed to parse - # print(f"Attempted to parse:\n{json_string[:500]}...") - return None - else: - print(f"Error: No JSON start character ('{{' or '[') found in {filepath}") - return None - - except Exception as e: - print(f"An unexpected error occurred while processing {filepath}: {e}") - return None + if line.lstrip().startswith(('{', '[')): return json.loads("".join(lines[i:])) + except Exception as e: print(f"JSON error {filepath}: {e}") + return None def process_fio_output_files(file_pattern, directory_path: str): - """ - Finds all files matching 'zyx*.json' in the given directory, - loads them using load_json_to_object, and collects the results. - - Args: - directory_path: The path to the directory to search in. - - Returns: - A list of objects, where each object is the result of - loading a matching JSON file. Contains None for files - that failed to load. - """ - if not os.path.isdir(directory_path): - print(f"Error: Directory not found: {directory_path}") - return [] - - loaded_objects = [] - - # print(f"Searching for files matching '{file_pattern}' in '{directory_path}'...") - - for filename in os.listdir(directory_path): - if fnmatch.fnmatch(filename, file_pattern): - full_filepath = os.path.join(directory_path, filename) - if os.path.isfile(full_filepath): - loaded_object = clean_load_json_to_object(full_filepath) - loaded_objects.append(loaded_object) - else: - print(f"Skipping non-file entry: {filename}") - - return loaded_objects + if not os.path.isdir(directory_path): return [] + objs = [] + for f in os.listdir(directory_path): + if fnmatch.fnmatch(f, file_pattern) and os.path.isfile(os.path.join(directory_path, f)): + o = clean_load_json_to_object(os.path.join(directory_path, f)) + if o is not None: objs.append(o) + return objs def get_avg_perf_metrics_for_job(case, artifacts_dir, vm_cfg): - bs=case['bs'] - file_size=case['file_size'] - iodepth=case['iodepth'] - iotype=case['iotype'] - threads=case['threads'] - nrfiles=case['nrfiles'] - - # Construct relevant filepaths - testcase=f'fio_output_{bs}_{file_size}_{iodepth}_{iotype}_{threads}_{nrfiles}' - raw_data_path=f'{artifacts_dir}/raw-results/{testcase}/' - fio_output_path="fio_output_iter" - timestamps_file=raw_data_path+"timestamps.csv" - - # Get the fio metrics - fio_metrics= process_fio_output_files(f'{fio_output_path}*.json', raw_data_path) - - # Get the VM metrics - timestamps= load_csv_to_object(timestamps_file) - # print(timestamps) - + raw = f"{artifacts_dir}/raw-results/fio_output_{case['bs']}_{case['file_size']}_{case['iodepth']}_{case['iotype']}_{case['threads']}_{case['nrfiles']}" + fio_metrics = process_fio_output_files("fio_output_iter*.json", raw) + with open(f"{raw}/timestamps.csv", 'r') as f: timestamps = list(csv.DictReader(f)) metrics = process_fio_metrics_and_vm_metrics(fio_metrics, timestamps, vm_cfg) - - metrics['bs']=bs - metrics['file_size']=file_size - metrics['iodepth']=iodepth - metrics['iotype']=iotype - metrics['threads']=threads - metrics['nrfiles']=nrfiles - + metrics.update({k: case[k] for k in ['bs', 'file_size', 'iodepth', 'iotype', 'threads', 'nrfiles']}) return metrics -def load_testcases_from_csv(filepath): - data = load_csv_to_object(filepath) - return data - - def parse_benchmark_results(benchmark_id, ARTIFACTS_BUCKET, cfg): - vm_cfg={ - 'instance_name': cfg.get('bench_env').get('gce_env').get('vm_name'), - 'zone': cfg.get('bench_env').get('zone'), - 'project': cfg.get('bench_env').get('project'), - } - + vm_cfg = {'instance_name': cfg['bench_env']['gce_env']['vm_name'], 'zone': cfg['bench_env']['zone'], 'project': cfg['bench_env']['project']} artifacts = download_artifacts_from_bucket(benchmark_id, ARTIFACTS_BUCKET) - - testcases = load_testcases_from_csv(f'{artifacts}/fio_job_cases.csv') - metrics={} - for tc in testcases: - tc_metrics=get_avg_perf_metrics_for_job(tc, artifacts, vm_cfg) - key = f"{tc['bs']}_{tc['file_size']}_{tc['iodepth']}_{tc['iotype']}_{tc['threads']}_{tc['nrfiles']}" - metrics[key] = tc_metrics + with open(f'{artifacts}/fio_job_cases.csv', 'r') as f: testcases = list(csv.DictReader(f)) + metrics = {f"{tc['bs']}_{tc['file_size']}_{tc['iodepth']}_{tc['iotype']}_{tc['threads']}_{tc['nrfiles']}": get_avg_perf_metrics_for_job(tc, artifacts, vm_cfg) for tc in testcases} return artifacts, metrics - - -# Example Usage: -if __name__ == '__main__': - try: - # Replace with your bucket and a folder you want to test with - bucket = "non-existent-bucket " # Example bucket - benchmark = "randomid" # Example folder - - print(f"Attempting to download gs://{bucket}/{benchmark}") - - # Create a dummy folder and file in the bucket for testing - # You would need to do this manually or via another script setup - # Example commands to set up for test: - # echo "hello world" > /tmp/dummy.txt - # gcloud storage cp /tmp/dummy.txt gs://gcs-fuse-test/test-artifact-folder/dummy.txt - cfg={ - 'bench_env': { - 'gce_env': { - 'vm_name': 'non-existent-vm', - }, - 'zone': 'us-central1-a', - 'project': 'non-existent-project', - }, - } - - downloaded_path, metrics = parse_benchmark_results(benchmark, bucket, cfg) - # --- Pretty Print the metrics dictionary --- - print("\n--- Pretty Printed Metrics ---") - pretty_metrics = json.dumps(metrics, indent=4, sort_keys=True) - print(pretty_metrics) - - - # IMPORTANT: Clean up the temporary directory - print(f"\nCleaning up temporary directory: {os.path.dirname(downloaded_path)}") - shutil.rmtree(os.path.dirname(downloaded_path)) - print("Cleanup complete.") - - except ValueError as e: - print(f"Input Error: {e}") - except FileNotFoundError as e: - print(f"Error: {e}") - except subprocess.CalledProcessError: - print("Download failed, check logs above.") - except Exception as e: - print(f"An unexpected error occurred: {e}") \ No newline at end of file diff --git a/gcsfuse-micro-benchmarking/helpers/rationalize.py b/gcsfuse-micro-benchmarking/helpers/rationalize.py index d0d7fc2..cb5ef6b 100644 --- a/gcsfuse-micro-benchmarking/helpers/rationalize.py +++ b/gcsfuse-micro-benchmarking/helpers/rationalize.py @@ -80,22 +80,6 @@ def rationalize_gce_vm_config(default_cfg, new_cfg): def check_bucket_storage_class(bucket_name: str): - """ - Checks a GCS bucket's default storage class using 'gcloud storage buckets list'. - - - If the bucket does not exist, this function silently returns. - - If the bucket exists and its default storage class is "RAPID", - it raises a BucketStorageClassError. - - Otherwise (bucket exists with a different storage class), it prints a - success message and returns peacefully. - - Args: - bucket_name: The name of the GCS bucket to check. - - Raises: - BucketStorageClassError: If the bucket's default storage class is "RAPID". - RuntimeError: If the 'gcloud' command-line tool is not found. - """ try: # Construct the gcloud command to list bucket details, filtering by the exact bucket name. # We request JSON output containing only the name and storageClass. @@ -104,14 +88,10 @@ def check_bucket_storage_class(bucket_name: str): "storage", "buckets", "list", - f"--filter=name={bucket_name}", # Filter for the specific bucket URI - "--format=json" # Output format + f"--filter=name={bucket_name}", + "--format=json" ] - # Execute the gcloud command. - # capture_output=True: Captures stdout and stderr. - # text=True: Decodes stdout and stderr as strings. - # check=False: Prevents raising an exception for non-zero gcloud exit codes. result = subprocess.run( cmd, capture_output=True, @@ -120,9 +100,6 @@ def check_bucket_storage_class(bucket_name: str): ) if result.returncode != 0: - # Log gcloud command errors to stderr for debugging. - # These could be permission issues, gcloud internal errors, etc. - # The function should still exit peacefully in these cases as per requirements. print( f"Warning: gcloud command exited with code {result.returncode} for bucket '{bucket_name}'.\n" f"Stderr: {result.stderr.strip()}", @@ -131,21 +108,16 @@ def check_bucket_storage_class(bucket_name: str): return "invalid" try: - # The expected output is a JSON array. bucket_list = json.loads(result.stdout) except json.JSONDecodeError: - # Handle cases where gcloud output isn't valid JSON. print( f"Warning: Failed to parse JSON output from gcloud for bucket '{bucket_name}'.\n" f"Stdout: {result.stdout.strip()}", file=sys.stderr ) - # Exit peacefully. return "invalid" if not bucket_list: - # The filter returned an empty list, meaning the bucket does not exist. - # Silently ignore and return as per requirements. return "dne" # Since we filtered by exact name, we expect at most one item. @@ -157,9 +129,7 @@ def check_bucket_storage_class(bucket_name: str): # This error occurs if the gcloud binary is not in the system's PATH. raise RuntimeError("gcloud command not found. Ensure Google Cloud SDK is installed and in your PATH.") from None except Exception as e: - # Catch any other unexpected exceptions. print(f"An unexpected error occurred while checking bucket '{bucket_name}': {e}", file=sys.stderr) - # Exit peacefully for other issues. return @@ -290,23 +260,3 @@ def rationalize_config(cfg): cfg['job_details']=rationalize_job_details(cfg.get('job_details')) cfg['bench_env']=rationalize_bench_env(cfg.get('zonal_benchmarking'),cfg.get('bench_env')) return cfg - - -if __name__ == '__main__': - cfg = { - 'zonal_benchmarking':False, - 'fio_jobfile_template': '/path/default', - 'mount_config_file': 'lambda', - 'version_details': { - 'gcsfuse_version_or_commit': 'beta', - }, - 'job_details':{ - 'bs': ["1KB", "4KB"], - }, - 'bench_env':{ - 'gce_env':{ - 'machine_type': "linda", - }, - } - } - print(rationalize_config(cfg)) \ No newline at end of file diff --git a/gcsfuse-micro-benchmarking/helpers/record_bench_id.py b/gcsfuse-micro-benchmarking/helpers/record_bench_id.py index ef3a948..5858f85 100644 --- a/gcsfuse-micro-benchmarking/helpers/record_bench_id.py +++ b/gcsfuse-micro-benchmarking/helpers/record_bench_id.py @@ -53,7 +53,3 @@ def record_benchmark_id_for_user(benchmark_id, bench_type, artifacts_bucket): print(f"Uploaded updated runs.json to {gcs_runs_file}") except Exception as e: print(f"Error uploading updated JSON file to GCS: {e}") - - -if __name__ == '__main__': - record_benchmark_id_for_user("test-benchmark-123", "feature", "random-non-existent-bucket") diff --git a/gcsfuse-micro-benchmarking/helpers/upload.py b/gcsfuse-micro-benchmarking/helpers/upload.py index ee9ead4..ee21455 100644 --- a/gcsfuse-micro-benchmarking/helpers/upload.py +++ b/gcsfuse-micro-benchmarking/helpers/upload.py @@ -2,97 +2,21 @@ import subprocess import tempfile import os -import shlex -def store_metrics_in_artifacts_bucket( - metrics: dict, - benchmark_id: str, - artifacts_bucket_name: str, - project_id: str -): - """ - Stores a metrics dictionary as a JSON file in a Google Cloud Storage bucket - using gcloud storage cp, specifying the project ID. +def store_metrics_in_artifacts_bucket(metrics, benchmark_id, artifacts_bucket_name, project_id): + if not all([benchmark_id, artifacts_bucket_name, project_id]): + print("Input error: Missing parameters"); return - Args: - metrics (dict): A dictionary containing the metrics to store. - benchmark_id (str): The identifier for the benchmark, used as a folder name. - artifacts_bucket_name (str): The name of the GCS bucket. - project_id (str): The Google Cloud Project ID to use for the gcloud command. - """ - local_file_path = None # Initialize to None + tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") try: - # Validate inputs to avoid empty paths or IDs - if not benchmark_id: - raise ValueError("benchmark_id cannot be empty") - if not artifacts_bucket_name: - raise ValueError("artifacts_bucket_name cannot be empty") - if not project_id: - raise ValueError("project_id cannot be empty") - - # Create a temporary file to store the JSON data - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json") as tmp_file: - json.dump(metrics, tmp_file, indent=4) - local_file_path = tmp_file.name - - # Define the destination GCS path - gcs_dest_path = f"gs://{artifacts_bucket_name}/{benchmark_id}/result.json" - - # Construct the gcloud storage cp command as a list - command = [ - 'gcloud', - '--project', project_id, - 'storage', - 'cp', - local_file_path, - gcs_dest_path - ] - - # Execute the command - # print(f"Running command: {command}") - # Explicitly set shell=False for security and clarity - result = subprocess.run(command, check=True, capture_output=True, text=True, shell=False) - - print(f"Successfully stored metrics in {gcs_dest_path} for project {project_id}") - if result.stdout: - print("gcloud stdout:\n", result.stdout) - if result.stderr: - # gcloud storage can put progress on stderr - print("gcloud stderr:\n", result.stderr) - - except subprocess.CalledProcessError as e: - print(f"Error during gcloud command execution:") - print(f"Command: {e.cmd}") - print(f"Return Code: {e.returncode}") - print(f"Stdout:\n{e.stdout}") - print(f"Stderr:\n{e.stderr}") - except ValueError as e: - print(f"Input error: {e}") - except Exception as e: - print(f"An error occurred: {e}") + with tmp: json.dump(metrics, tmp, indent=4) + dest = f"gs://{artifacts_bucket_name}/{benchmark_id}/result.json" + subprocess.run(['gcloud', '--project', project_id, 'storage', 'cp', tmp.name, dest], check=True, capture_output=True, text=True) + print(f"Stored metrics in {dest}") + except subprocess.CalledProcessError as e: print(f"Gcloud error: {e.stderr}") + except Exception as e: print(f"Error: {e}") finally: # Clean up the temporary file if local_file_path and os.path.exists(local_file_path): os.remove(local_file_path) print(f"Removed temporary file: {local_file_path}") - -# Example Usage: -if __name__ == '__main__': - # Replace with your actual bucket name and benchmark ID - MY_ARTIFACTS_BUCKET = "" # Example: "my-ml-artifacts" - MY_BENCHMARK_ID = "" - PROJECT="" - - metrics = { - '4KB_1MB_1_read_1_1': { - 'fio_metrics': {'avg_read_throughput_kibps': 3172.0, 'stdev_read_throughput_kibps': 63.6396, 'avg_write_throughput_kibps': 0.0, 'stdev_write_throughput_kibps': 0.0, 'avg_read_latency_ns': 2705.68, 'stdev_read_latency_ns': 225.296, 'avg_write_latency_ns': 0.0, 'stdev_write_latency_ns': 0.0}, - 'vm_metrics': {}, 'cpu_percent_per_gbps': 0.12345, 'bs': '4KB', 'file_size': '1MB', 'iodepth': '1', 'iotype': 'read', 'threads': '1', 'nrfiles': '1' - }, - '8KB_2MB_2_write_1_1': { - 'fio_metrics': {'avg_read_throughput_kibps': 0.0, 'stdev_read_throughput_kibps': 0.0, 'avg_write_throughput_kibps': 5200.0, 'stdev_write_throughput_kibps': 150.0, 'avg_read_latency_ns': 0.0, 'stdev_write_latency_ns': 0.0, 'avg_write_latency_ns': 1800.0, 'stdev_write_latency_ns': 100.0}, - 'vm_metrics': {}, 'cpu_percent_per_gbps': 0.23456, 'bs': '8KB', 'file_size': '2MB', 'iodepth': '2', 'iotype': 'write', 'threads': '1', 'nrfiles': '1' - } - } - - store_metrics_in_artifacts_bucket(metrics, MY_BENCHMARK_ID, MY_ARTIFACTS_BUCKET,PROJECT) - diff --git a/gcsfuse-micro-benchmarking/helpers/validate.py b/gcsfuse-micro-benchmarking/helpers/validate.py index 3236c11..f6d1bbd 100644 --- a/gcsfuse-micro-benchmarking/helpers/validate.py +++ b/gcsfuse-micro-benchmarking/helpers/validate.py @@ -1,139 +1,51 @@ import subprocess -import shlex import json def extract_region_from_zone(zone): - """ - Given a GCS zone string, extracts and returns the region. - - Args: - zone (str): The zone string (e.g., 'eu-west4-a'). - - Returns: - str: The corresponding region (e.g., 'eu-west4'). - """ - # Split the string by the hyphen and rejoin all parts except the last one - return '-'.join(zone.split('-')[:-1]) - + return zone.rsplit('-', 1)[0] def _get_vm_zone(vm_name, project): - """ - Retrieves the zone of a GCE VM. - - Returns: - str: The VM's zone (e.g., 'us-central1-a') or None if not found. - """ try: - # Construct the gcloud command to list instances and filter by name. - # The --format flag is used to get the output as a JSON list, - # which is easier to parse. We specifically select the 'zone' and 'name' fields. - command = [ - 'gcloud', 'compute', 'instances', 'list', - '--project={}'.format(project), - '--filter=name="{}"'.format(vm_name), - '--format=json' - ] - - # Run the command and capture the output. - result = subprocess.run(command, capture_output=True, text=True, check=True) - - # Parse the JSON output. - vm_list = json.loads(result.stdout) - - # If a VM with the given name is found, return its zone. - if vm_list: - # The 'zone' field from gcloud is a full URL, e.g., '.../zones/us-central1-a'. - # We split the string by '/' and take the last part to get the zone name. - zone_url = vm_list[0]['zone'] - return zone_url.split('/')[-1] - else: - return None - - except subprocess.CalledProcessError as e: - print(f"Error executing gcloud command: {e.stderr}") + cmd = ['gcloud', 'compute', 'instances', 'list', f'--project={project}', f'--filter=name="{vm_name}"', '--format=json(zone)'] + res = subprocess.run(cmd, capture_output=True, text=True, check=True) + data = json.loads(res.stdout) + return data[0]['zone'].split('/')[-1] if data else None + except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError) as e: + print(f"Error getting VM zone: {e}") return None - except json.JSONDecodeError: - print("Error parsing JSON output from gcloud.") - return None - def _get_bucket_location(bucket_name, project): - """ - Retrieves the location of a GCS bucket. - - Returns: - str: The bucket's location (e.g., 'US-CENTRAL1') or None if not found. - """ try: - command_string = f"gcloud storage buckets describe gs://{bucket_name} --project={project} --format='value(location)'" - result = subprocess.run(shlex.split(command_string), check=True, capture_output=True, text=True) - return result.stdout.strip() + cmd = ['gcloud', 'storage', 'buckets', 'describe', f'gs://{bucket_name}', f'--project={project}', "--format=value(location)"] + return subprocess.run(cmd, check=True, capture_output=True, text=True).stdout.strip() except subprocess.CalledProcessError: print(f"Error: Bucket '{bucket_name}' not found.") return None - -def validate_if_vm_and_bucket_colocated(zonal, env_cfg,vm_name, bucket_name): - """ - Validates if a GCE VM and a GCS bucket are in the same region. - - Args: - vm_name (str): The name of the GCE VM. - bucket_name (str): The name of the GCS bucket. - - Returns: - bool: True if they are colocated, False otherwise. - """ - does_vm_exist=True - does_bkt_exist=True - - vm_zone = _get_vm_zone(vm_name, env_cfg.get('project')) +def validate_if_vm_and_bucket_colocated(zonal, env_cfg, vm_name, bucket_name): + project = env_cfg.get('project') + vm_zone = _get_vm_zone(vm_name, project) + does_vm_exist = bool(vm_zone) if not vm_zone: - does_vm_exist=False vm_zone = env_cfg.get('zone') - bucket_location = _get_bucket_location(bucket_name, env_cfg.get('project')) + bucket_location = _get_bucket_location(bucket_name, project) + does_bkt_exist = bool(bucket_location) if not bucket_location: - does_bkt_exist=False bucket_location = env_cfg.get('zone') if zonal else extract_region_from_zone(env_cfg.get('zone')) - is_colocated=False - if zonal and bucket_location == vm_zone: - is_colocated=True - if not zonal and vm_zone.startswith(bucket_location): - is_colocated=True + is_colocated = (zonal and bucket_location == vm_zone) or (not zonal and vm_zone.startswith(bucket_location)) if is_colocated: print(f"Success: VM '{vm_name}' in region '{vm_zone}' and bucket '{bucket_name}' in location '{bucket_location}' are colocated.") - return does_vm_exist, does_bkt_exist else: print(f"Warning: VM '{vm_name}' in region '{vm_zone}' and bucket '{bucket_name}' in location '{bucket_location}' are NOT colocated. Benchmark numbers might be impacted") - return does_vm_exist, does_bkt_exist - + return does_vm_exist, does_bkt_exist def validate_existing_resources_if_any(zonal, env_cfg): - vm_name= env_cfg.get("gce_env").get("vm_name") - bucket_name=env_cfg.get("gcs_bucket").get("bucket_name") - if vm_name == "" and bucket_name == "": + vm_name = env_cfg.get("gce_env", {}).get("vm_name", "") + bucket_name = env_cfg.get("gcs_bucket", {}).get("bucket_name", "") + if not vm_name and not bucket_name: print("Both VM and GCS bucket do not exist. Newly created resources will be colocated") return False, False - - return validate_if_vm_and_bucket_colocated(zonal, env_cfg,vm_name, bucket_name) - - -# Example Usage -if __name__ == '__main__': - # Replace with your actual VM and bucket names - zonal=False - env_cfg={ - 'zone': 'us-central1-a', - 'gce_env': { - 'vm_name': 'my-non-existent-vm', - }, - 'gcs_bucket':{ - 'bucket_name': 'my-non-existent-bkt', - }, - } - - is_vm_exists,is_bkt_exists= validate_existing_resources_if_any(zonal, env_cfg) - print(is_vm_exists,is_bkt_exists) \ No newline at end of file + return validate_if_vm_and_bucket_colocated(zonal, env_cfg, vm_name, bucket_name) diff --git a/gcsfuse-micro-benchmarking/resources/starter_script.sh b/gcsfuse-micro-benchmarking/resources/starter_script.sh index 75016db..31aec9c 100644 --- a/gcsfuse-micro-benchmarking/resources/starter_script.sh +++ b/gcsfuse-micro-benchmarking/resources/starter_script.sh @@ -14,6 +14,24 @@ echo "The value of 'bucket' is: $bucket" echo "The value of 'artifacts_bucket' is: $artifacts_bucket" echo "The value of 'benchmark_id' is: $benchmark_id" +# Global OS/Arch detection +OS_KERNEL=$(uname -s) +ARCH_MACHINE=$(uname -m) +OS_LOWER="" +ARCH_GO="" + +case "$OS_KERNEL" in + Linux*) OS_LOWER="linux";; + MINGW*|MSYS*) OS_LOWER="windows";; + *) echo "Unsupported OS: $OS_KERNEL"; exit 1;; +esac + +case "$ARCH_MACHINE" in + x86_64) ARCH_GO="amd64";; + aarch64|arm64) ARCH_GO="arm64";; + *) echo "Unsupported architecture: $ARCH_MACHINE"; exit 1;; +esac + # A helper function to check if a command exists command_exists () { command -v "$1" >/dev/null 2>&1 @@ -22,51 +40,22 @@ command_exists () { # Platform-agnostic function to install yq by downloading from GitHub install_yq() { echo "Installing yq from GitHub releases..." - local OS=$(uname -s) - local arch=$(uname -m) - local install_path="/usr/local/bin/yq" # Default for Linux/macOS + local install_path="/usr/local/bin/yq" + local filename="yq_${OS_LOWER}_${ARCH_GO}" - # Adjust install_path for Windows/MSYS - if [[ "$OS" == "MINGW"* || "$OS" == "MSYS_NT"* ]]; then + if [[ "$OS_LOWER" == "windows" ]]; then install_path="/usr/bin/yq" + filename="${filename}.exe" fi # Ensure the target directory exists sudo mkdir -p "$(dirname "${install_path}")" - - echo "Detected OS: $OS, Architecture: $arch" - - local download_arch="" - case "$arch" in - x86_64) download_arch="amd64";; - aarch64|arm64) download_arch="arm64";; - *) echo "Unsupported architecture for yq download: $arch"; exit 1;; - esac - - local filename="" - case "$OS" in - "Linux") - filename="yq_linux_${download_arch}" - ;; - "Darwin") # macOS - filename="yq_darwin_${download_arch}" - ;; - "MINGW"*|"MSYS_NT"*) # Windows (Git Bash, MSYS) - OS="Windows" - filename="yq_windows_${download_arch}.exe" - ;; - *) - echo "Unsupported operating system for yq installation: $OS" - exit 1 - ;; - esac - local download_url="https://github.com/mikefarah/yq/releases/latest/download/$filename" echo "Attempting to remove any existing yq at ${install_path}..." sudo rm -f "${install_path}" - echo "Downloading yq for ${OS} ${download_arch} from ${download_url}..." + echo "Downloading yq from ${download_url}..." if ! sudo curl -L -o "${install_path}" "${download_url}"; then echo "Error: Failed to download yq." exit 1 @@ -76,89 +65,42 @@ install_yq() { echo "Verifying yq installation at ${install_path}..." if [[ ! -x "${install_path}" ]]; then echo "Error: yq not found or not executable at ${install_path}." - ls -l "${install_path}" exit 1 fi - - echo "yq executable is at: ${install_path}" - echo "yq Version:" "${install_path}" --version - echo "yq installed successfully to ${install_path}." } - - install_dependencies() { - # Get the operating system and store it in a variable - OS=$(uname -s) - - echo "Detected operating system: $OS" - - case "$OS" in + echo "Detected operating system: $OS_KERNEL" + case "$OS_KERNEL" in "Linux") echo "Installing dependencies for Linux..." - # Check for a Debian-based system (apt) if command_exists apt-get; then sudo apt-get update -y - # Note: libaio-dev is a Linux-specific dependency sudo apt-get install wget libaio-dev gcc g++ make git fuse -y - echo "apt-get packages installed." - # Check for a Red Hat-based system (dnf or yum) elif command_exists dnf; then sudo dnf install wget libaio-devel gcc-c++ make git fuse -y - echo "dnf packages installed." elif command_exists yum; then sudo yum install wget libaio-devel gcc-c++ make git fuse -y - echo "yum packages installed." - # Add more package managers as needed (e.g., pacman for Arch) else - echo "Could not find a supported package manager (apt-get, dnf, or yum). Please install dependencies manually." + echo "Could not find a supported package manager." exit 1 fi ;; - "Darwin") - echo "Installing dependencies for macOS..." - # Check for Homebrew - if ! command_exists brew; then - echo "Homebrew not found. Please install it from https://brew.sh and re-run the script." - exit 1 - fi - # Install build tools, git, and fuse - # Note: libaio-dev is not available on macOS, as it is a Linux-specific library. - # The build tools will be installed via Xcode Command Line Tools. - echo "Installing Xcode Command Line Tools..." - xcode-select --install - echo "Installing homebrew packages..." - if !command_exists wget; then - echo "Installing wget via Homebrew..." - brew install wget - else - echo "wget not found. Please install wget, or install Homebrew to install it." - exit 1 - fi - - brew install gcc git coreutils - echo "brew packages installed." - ;; - "MINGW"*|"MSYS_NT"*) - echo "Installing dependencies for Windows (Git Bash/MSYS2)..." - # Check for Chocolatey + echo "Installing dependencies for Windows..." if ! command_exists choco; then echo "Chocolatey not found. Please install it from https://chocolatey.org and re-run the script." exit 1 fi - # Note: libaio-dev is not available on Windows. - # Install build tools and Git using Chocolatey - echo "Installing Chocolatey packages..." choco install git make wget -y echo "choco packages installed." ;; *) - echo "Unsupported operating system: $OS. Please install dependencies manually." + echo "Unsupported operating system: $OS_KERNEL." exit 1 ;; esac @@ -186,137 +128,55 @@ copy_raw_results_to_artifacts_bucket(){ # Install fio on the VM. install_fio_on_vm() { local fio_install_path="/usr/local/bin/fio" - # Check if fio is already installed and executable at the expected path if [[ -x "$fio_install_path" ]]; then - echo "FIO is already installed at $fio_install_path" - "$fio_install_path" --version + echo "FIO is already installed." return 0 fi - # dir contains the path to the directory with version_details.yml local dir=$1 - echo "Fetching fio_version from ${dir}/version_details.yml" - # Use yq to parse the YAML file and get the fio_version. - local fio_version - fio_version=$(/usr/local/bin/yq e '.fio_version' "${dir}/version_details.yml") - - # Check if the version was successfully retrieved. - if [ -z "$fio_version" ]; then - echo "Error: Could not find fio_version in the YAML file." - return 1 - fi + local fio_version=$(/usr/local/bin/yq e '.fio_version' "${dir}/version_details.yml") + if [ -z "$fio_version" ]; then echo "Error: fio_version not found."; return 1; fi echo "Preparing to install fio version: ${fio_version}" ( - if [ -d "fio" ]; then - echo "Removing existing fio directory..." - rm -rf fio - fi - - echo "Cloning fio version: ${fio_version}..." - if ! git clone --depth 1 -b "fio-${fio_version}" https://github.com/axboe/fio.git; then - echo "Error: Failed to clone fio repository." - exit 1 - fi - - cd fio || { echo "Error: Failed to cd into fio directory."; exit 1; } - - echo "Configuring fio..." - if ! ./configure; then - echo "Error: fio configuration failed." - exit 1 - fi - - echo "Building fio..." - if ! make -j"$(nproc)"; then - echo "Error: fio build failed." - exit 1 - fi - - echo "Installing fio to $fio_install_path..." - if ! sudo make install; then - echo "Error: fio installation failed." - exit 1 - fi + rm -rf fio + git clone --depth 1 -b "fio-${fio_version}" https://github.com/axboe/fio.git || exit 1 + cd fio || exit 1 + ./configure || exit 1 + make -j"$(nproc)" || exit 1 + sudo make install || exit 1 ) - if [ $? -eq 0 ]; then - echo "Fio installation process complete." - if [[ -x "$fio_install_path" ]]; then + if [[ -x "$fio_install_path" ]]; then echo "fio installed successfully to $fio_install_path" - "$fio_install_path" --version - return 0 - else - echo "Error: fio not found at $fio_install_path after installation." - return 1 - fi + "$fio_install_path" --version + return 0 else echo "Fio installation process failed." return 1 fi } -# Helper function (if not already defined) -check_if_package_installed() { - command -v "$1" >/dev/null 2>&1 -} - - # Install golang on the VM. install_golang_on_vm() { - if check_if_package_installed "go"; then - echo "go is already installed on the VM" + if command_exists go; then + echo "go is already installed." return 0 fi - # dir contains the path to the directory with version_details.yml local dir=$1 - set -e # Exit immediately if a command exits with a non-zero status. - - echo "Fetching go_version from ${dir}/version_details.yml" local go_version=$(/usr/local/bin/yq e '.go_version' "${dir}/version_details.yml") - - if [ -z "$go_version" ]; then - echo "Error: Could not find go_version in the YAML file." - return 1 - fi + if [ -z "$go_version" ]; then echo "Error: go_version not found."; return 1; fi echo "Installing Go version: ${go_version}" + if [[ "$OS_LOWER" == "windows" ]]; then echo "Go install not supported on Windows via script"; return 1; fi - # --- Automated OS and Architecture Detection --- - local os="" - case "$(uname -s)" in - Linux*) os=linux;; - Darwin*) os=darwin;; - *) echo "Unsupported OS: $(uname -s)"; return 1;; - esac - - local arch="" - case "$(uname -m)" in - x86_64) arch=amd64;; - arm64) arch=arm64;; - aarch64) arch=arm64;; - *) echo "Unsupported architecture: $(uname -m)"; return 1;; - esac - - # --- Installation Logic --- - local go_file="go${go_version}.${os}-${arch}.tar.gz" + local go_file="go${go_version}.${OS_LOWER}-${ARCH_GO}.tar.gz" local download_url="https://go.dev/dl/${go_file}" local download_path="/tmp/${go_file}" - echo "Downloading ${download_url}..." wget "$download_url" -O "$download_path" - - # Remove any previous Go installation to avoid conflicts sudo rm -rf /usr/local/go - - # Extract the new Go tarball to /usr/local - echo "Extracting Go to /usr/local/..." sudo tar -C /usr/local -xzf "$download_path" - - # Set up the environment PATH if it's not already configured - local go_bin_path='/usr/local/go/bin' - export PATH="$PATH:$go_bin_path" - - # Clean up the downloaded file + export PATH="$PATH:/usr/local/go/bin" rm "$download_path" echo "Go installation of version ${go_version} complete." @@ -324,49 +184,30 @@ install_golang_on_vm() { } build_gcsfuse() { - # dir contains the path to the directory with version_details.yml local dir=$1 local gcsfuse_install_path="/usr/local/bin/gcsfuse" - set -e - - echo "Fetching gcsfuse version from ${dir}/version_details.yml" - local gcsfuse_version - gcsfuse_version=$(/usr/local/bin/yq e '.gcsfuse_version_or_commit' "${dir}/version_details.yml") - - if [ -z "$gcsfuse_version" ]; then - echo "Error: Could not find gcsfuse_version in the YAML file." - return 1 - fi + local gcsfuse_version=$(/usr/local/bin/yq e '.gcsfuse_version_or_commit' "${dir}/version_details.yml") + if [ -z "$gcsfuse_version" ]; then echo "Error: gcsfuse_version not found."; return 1; fi echo "Building gcsfuse binary with version: ${gcsfuse_version}" # Use a subshell to avoid changing the script's main working directory ( - # Remove existing gcsfuse directory if it exists echo "Removing existing gcsfuse directory..." rm -rf gcsfuse - # Clone the gcsfuse repository echo "Cloning gcsfuse repository..." git clone https://github.com/GoogleCloudPlatform/gcsfuse.git - - # Navigate into the cloned directory cd gcsfuse - # Check out the specific version or commit echo "Checking out version/commit: ${gcsfuse_version}" git checkout "${gcsfuse_version}" - - # Build the gcsfuse binary - echo "Building the binary..." go build - # Install the binary to a system path echo "Installing gcsfuse to ${gcsfuse_install_path}" sudo cp -f gcsfuse "${gcsfuse_install_path}" ) - echo "Verifying gcsfuse installation..." if [[ -x "${gcsfuse_install_path}" ]]; then echo "gcsfuse installed successfully to ${gcsfuse_install_path}" "${gcsfuse_install_path}" --version @@ -383,66 +224,26 @@ mount_gcsfuse() { local mount_config="$3" local gcsfuse_binary="/usr/local/bin/gcsfuse" - # Create the mount directory if it doesn't exist - if [ ! -d "$mntdir" ]; then - mkdir -p "$mntdir" - fi + mkdir -p "$mntdir" + if mountpoint -q "$mntdir"; then echo "Directory '$mntdir' is already a mount point. Skipping."; return 0; fi + if [ ! -f "$mount_config" ]; then echo "Error: Config not found."; return 1; fi - # Check if the directory is already mounted - if mountpoint -q "$mntdir"; then - echo "Directory '$mntdir' is already a mount point. Skipping." - return 0 - fi - - # Check if the mount config file exists - if [ ! -f "$mount_config" ]; then - echo "Error: Mount config file not found at '$mount_config'." - return 1 - fi - - # Mount the GCS bucket using the config file echo "Mounting bucket '$bucketname' to '$mntdir' with config '$mount_config'..." "${gcsfuse_binary}" --config-file="$mount_config" "$bucketname" "$mntdir" - - # Verify the mount was successful - if mountpoint -q "$mntdir"; then - echo "Successfully mounted '$bucketname' to '$mntdir'." - return 0 - else - echo "Error: Failed to mount '$bucketname' to '$mntdir'." - return 1 - fi + if mountpoint -q "$mntdir"; then return 0; else return 1; fi } unmount_gcsfuse() { local mntdir="$1" - - # Check if the directory is a mount point - if ! mountpoint -q "$mntdir"; then - echo "Directory '$mntdir' is not a mount point. Skipping." - return 0 - fi + if ! mountpoint -q "$mntdir"; then return 0; fi - echo "Unmounting '$mntdir'..." - - # Use the correct unmount command based on the OS - if [[ "$(uname)" == "Linux" ]]; then + if [[ "$OS_KERNEL" == "Linux" ]]; then fusermount -uz "$mntdir" - elif [[ "$(uname)" == "Darwin" ]]; then - umount "$mntdir" else echo "Warning: Unsupported OS for unmounting." return 1 fi - - # Verify the unmount was successful - if ! mountpoint -q "$mntdir"; then - echo "Successfully unmounted '$mntdir'." - return 0 - else - echo "Error: Failed to unmount '$mntdir'." - return 1 - fi + if ! mountpoint -q "$mntdir"; then return 0; else return 1; fi } start_benchmarking_runs() { @@ -451,47 +252,26 @@ start_benchmarking_runs() { local fio_binary="/usr/local/bin/fio" mkdir -p "${dir}/raw-results/" - fio_job_cases="${dir}/fio_job_cases.csv" fio_job_file="${dir}/jobfile.fio" - - # Check if the job cases file exists - if [ ! -f "$fio_job_cases" ]; then - echo "Error: FIO job cases file not found at ${fio_job_cases}" - return 1 - fi - - # Check if the job file exists - if [ ! -f "$fio_job_file" ]; then - echo "Error: FIO job file not found at ${fio_job_file}" - return 1 - fi - mntdir="${dir}/mntdir/" mount_config="${dir}/mount_config.yml" - # Check if the mount config file exists - if [ ! -f "$mount_config" ]; then - echo "Error: mount config file not found at ${mount_config}" + if [ ! -f "$fio_job_cases" ] || [ ! -f "$fio_job_file" ] || [ ! -f "$mount_config" ]; then + echo "Error: Missing configuration files." return 1 fi - # Read the CSV file line by line, skipping the header while IFS=, read -r bs file_size iodepth iotype threads nrfiles || [[ -n "$nrfiles" ]]; do - # Iterate for the specified number of runs for this job case - # Mount the bucket once before the loop if reuse_same_mount is 'true' if [[ "$reuse_same_mount" == "true" ]]; then mount_gcsfuse "$mntdir" "$bucket" "$mount_config" fi nrfiles="${nrfiles%$'\r'}" - - echo "Experiment config: ${bs}, ${file_size}, ${iodepth}, ${iotype}, ${threads}, ${nrfiles}" + echo "Config: ${bs}, ${file_size}, ${iodepth}, ${iotype}, ${threads}, ${nrfiles}" testdir="${dir}/raw-results/fio_output_${bs}_${file_size}_${iodepth}_${iotype}_${threads}_${nrfiles}" mkdir -p "$testdir" - - timestamps_file="${testdir}/timestamps.csv" - echo "iteration,start_time,end_time" > "$timestamps_file" + echo "iteration,start_time,end_time" > "${testdir}/timestamps.csv" for ((i = 1; i <= iterations; i++)); do echo "Starting FIO run ${i} of ${iterations} for case: bs=${bs}, file_size=${file_size}, iodepth=${iodepth}, iotype=${iotype}, threads=${threads}, nrfiles=${nrfiles}" @@ -499,56 +279,37 @@ start_benchmarking_runs() { if [[ "$reuse_same_mount" != "true" ]]; then mount_gcsfuse "$mntdir" "$bucket" "$mount_config" fi - start_time=$(date -u +"%Y-%m-%dT%H:%M:%S%z") - filename_format="${iotype}-\$jobnum/\$filenum" output_file="${testdir}/fio_output_iter${i}.json" - MNTDIR=${mntdir} IODEPTH=${iodepth} IOTYPE=${iotype} BLOCKSIZE=${bs} FILESIZE=${file_size} NRFILES=${nrfiles} NUMJOBS=${threads} FILENAME_FORMAT=${filename_format} ${fio_binary} $fio_job_file --output-format=json > "$output_file" 2>&1 + MNTDIR=${mntdir} IODEPTH=${iodepth} IOTYPE=${iotype} BLOCKSIZE=${bs} FILESIZE=${file_size} NRFILES=${nrfiles} NUMJOBS=${threads} FILENAME_FORMAT="${iotype}-\$jobnum/\$filenum" ${fio_binary} $fio_job_file --output-format=json > "$output_file" 2>&1 - end_time=$(date -u +"%Y-%m-%dT%H:%M:%S%z") - echo "${i},${start_time},${end_time}" >> "$timestamps_file" + echo "${i},${start_time},$(date -u +"%Y-%m-%dT%H:%M:%S%z")" >> "${testdir}/timestamps.csv" - # If reuse_same_mount is 'false', unmount after this run if [[ "$reuse_same_mount" != "true" ]]; then unmount_gcsfuse "$mntdir" fi echo "Sleeping for 20 seconds to keep VM metrics independent for each iteration...." sleep 20 - done - # Unmount the bucket once after the loop if reuse_same_mount is 'true' if [[ "$reuse_same_mount" == "true" ]]; then unmount_gcsfuse "$mntdir" fi done < <(tail -n +2 "$fio_job_cases") - - } # Check if the values were retrieved successfully if [ -n "$bucket" ] && [ -n "$artifacts_bucket" ] && [ -n "$benchmark_id" ]; then - echo "Metadata parameters are accessible and were retrieved successfully." dir="$(pwd)" - - # Install yq install_yq - - # Install other dependencies install_dependencies - - # Copy the resources necessary for running the benchmark copy_resources_from_artifact_bucket "$artifacts_bucket" "$benchmark_id" "$dir" - install_fio_on_vm "$dir" install_golang_on_vm "$dir" build_gcsfuse "$dir" - start_benchmarking_runs "$dir" "$iterations" - copy_raw_results_to_artifacts_bucket "$artifacts_bucket" "$benchmark_id" "$dir" - touch /tmp/success.txt gcloud storage cp /tmp/success.txt gs://$artifacts_bucket/$benchmark_id/success.txt exit 0