diff --git a/distributed-micro-benchmarking/.gitignore b/distributed-micro-benchmarking/.gitignore new file mode 100644 index 0000000..87acd68 --- /dev/null +++ b/distributed-micro-benchmarking/.gitignore @@ -0,0 +1,6 @@ +# Python cache +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python diff --git a/distributed-micro-benchmarking/helpers/__init__.py b/distributed-micro-benchmarking/helpers/__init__.py new file mode 100644 index 0000000..5453b9c --- /dev/null +++ b/distributed-micro-benchmarking/helpers/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Distributed micro-benchmarking helpers diff --git a/distributed-micro-benchmarking/helpers/gcloud_utils.py b/distributed-micro-benchmarking/helpers/gcloud_utils.py new file mode 100644 index 0000000..2da2cba --- /dev/null +++ b/distributed-micro-benchmarking/helpers/gcloud_utils.py @@ -0,0 +1,103 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unified gcloud command execution utilities""" + +import subprocess +import time + + +def run_gcloud_command(cmd, retries=1, retry_delay=2, check=False, capture_output=True, text=True, **kwargs): + """Execute a gcloud command with optional retry logic. + + Args: + cmd: List of command components (e.g., ['gcloud', 'storage', 'cp', ...]) + retries: Number of retry attempts (1 = no retry, 3 = try 3 times) + retry_delay: Seconds to wait between retries + check: If True, raise CalledProcessError on non-zero exit code + capture_output: If True, capture stdout/stderr + text: If True, decode output as text + **kwargs: Additional arguments passed to subprocess.run() + + Returns: + subprocess.CompletedProcess object + + Raises: + Exception: If command fails after all retries and check=True + """ + for attempt in range(retries): + result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs) + + if result.returncode == 0: + return result + + if attempt < retries - 1: + time.sleep(retry_delay) + + if check: + raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}") + + return result + + +def gcloud_storage_cp(source, dest, recursive=False, retries=3, check=True): + """Copy files to/from GCS""" + cmd = ['gcloud', 'storage', 'cp'] + if recursive: + cmd.append('-r') + cmd.extend([source, dest]) + + return run_gcloud_command(cmd, retries=retries, check=check) + + +def gcloud_storage_ls(pattern, check=False): + """List GCS objects matching a pattern""" + cmd = ['gcloud', 'storage', 'ls', pattern] + return run_gcloud_command(cmd, retries=1, check=check) + + +def gcloud_compute_ssh(vm_name, zone, project, command=None, internal_ip=True, check=True, **kwargs): + """SSH to a compute instance""" + cmd = ['gcloud', 'compute', 'ssh', vm_name, f'--zone={zone}', f'--project={project}'] + if internal_ip: + cmd.append('--internal-ip') + if command: + cmd.extend(['--command', command]) + + return run_gcloud_command(cmd, retries=1, check=check, **kwargs) + + +def gcloud_compute_scp(source, dest, zone, project, internal_ip=True, check=True): + """Copy files to/from a compute instance""" + cmd = ['gcloud', 'compute', 'scp', source, dest, f'--zone={zone}', f'--project={project}'] + if internal_ip: + cmd.append('--internal-ip') + + return run_gcloud_command(cmd, retries=1, check=check) + + +def gcloud_compute_instance_group_list(instance_group, zone, project, filter_status='RUNNING'): + """List VM names in a managed instance group""" + cmd = [ + 'gcloud', 'compute', 'instance-groups', 'managed', 'list-instances', + instance_group, + f'--zone={zone}', + f'--project={project}', + f'--filter=STATUS={filter_status}', + '--format=value(NAME)' + ] + + result = run_gcloud_command(cmd, retries=1, check=True) + vms = [vm.strip() for vm in result.stdout.strip().split('\n') if vm.strip()] + return vms diff --git a/distributed-micro-benchmarking/helpers/gcs.py b/distributed-micro-benchmarking/helpers/gcs.py new file mode 100644 index 0000000..39cf3f2 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/gcs.py @@ -0,0 +1,86 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GCS operations for distributed benchmarking""" + +import json +import tempfile +from . import gcloud_utils + + +def upload_json(data, gcs_path): + """Upload JSON data to GCS with retry on failure""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(data, f, indent=2) + f.flush() + + try: + gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True) + except Exception as e: + raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}") + + +def download_json(gcs_path): + """Download and parse JSON from GCS""" + with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=False) as f: + result = gcloud_utils.gcloud_storage_cp(gcs_path, f.name, retries=1, check=False) + + if result.returncode != 0: + return None + + with open(f.name, 'r') as rf: + return json.load(rf) + + +def upload_test_cases(csv_path, base_path): + """Upload test cases CSV to GCS""" + import os + if not os.path.exists(csv_path): + raise FileNotFoundError(f"Test cases file not found: {csv_path}") + + dest = f"{base_path}/test-cases.csv" + gcloud_utils.gcloud_storage_cp(csv_path, dest, retries=1, check=True) + + +def upload_fio_job_file(fio_path, base_path): + """Upload FIO job template to GCS""" + import os + if not os.path.exists(fio_path): + raise FileNotFoundError(f"FIO job file not found: {fio_path}") + + dest = f"{base_path}/jobfile.fio" + gcloud_utils.gcloud_storage_cp(fio_path, dest, retries=1, check=True) + + +def list_manifests(benchmark_id, artifacts_bucket): + """List all manifest files for a benchmark""" + pattern = f"gs://{artifacts_bucket}/{benchmark_id}/results/*/manifest.json" + result = gcloud_utils.gcloud_storage_ls(pattern, check=False) + + if result.returncode != 0: + return [] + + return [line.strip() for line in result.stdout.strip().split('\n') if line.strip()] + + +def download_directory(gcs_path, local_path): + """Download a directory from GCS""" + gcloud_utils.gcloud_storage_cp(gcs_path, local_path, recursive=True, retries=1, check=True) + + +def check_cancellation(benchmark_id, artifacts_bucket): + """Check if cancellation flag exists in GCS""" + cancel_path = f"gs://{artifacts_bucket}/{benchmark_id}/cancel" + result = gcloud_utils.gcloud_storage_ls(cancel_path, check=False) + return result.returncode == 0 diff --git a/distributed-micro-benchmarking/helpers/job_generator.py b/distributed-micro-benchmarking/helpers/job_generator.py new file mode 100644 index 0000000..40b108a --- /dev/null +++ b/distributed-micro-benchmarking/helpers/job_generator.py @@ -0,0 +1,98 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Job specification generation and test distribution""" + +import csv + + +def _load_csv_with_id(csv_path, id_field): + """Generic CSV loader that adds sequential ID field""" + items = [] + with open(csv_path, 'r') as f: + reader = csv.DictReader(f) + for i, row in enumerate(reader, start=1): + row[id_field] = i + items.append(row) + return items + + +def load_test_cases(csv_path): + """Load test cases from CSV file""" + return _load_csv_with_id(csv_path, 'test_id') + + +def load_configs(csv_path): + """Load config variations from CSV file""" + return _load_csv_with_id(csv_path, 'config_id') + + +def generate_test_matrix(test_cases, configs): + """Generate cartesian product of configs × test_cases""" + test_matrix = [] + matrix_id = 1 + + for config in configs: + for test_case in test_cases: + # Spread test_case first, then override with matrix-specific IDs + matrix_entry = { + **test_case, + 'matrix_id': matrix_id, + 'config_id': config['config_id'], + 'test_id': test_case['test_id'], + 'commit': config['commit'], + 'mount_args': config['mount_args'], + 'config_label': config['label'] + } + test_matrix.append(matrix_entry) + matrix_id += 1 + + return test_matrix + + +def distribute_tests(test_cases, vms, is_matrix=False): + """Distribute test cases evenly across VMs""" + num_vms = len(vms) + tests_per_vm = len(test_cases) // num_vms + remaining = len(test_cases) % num_vms + + distribution = {} + start_idx = 0 + + for i, vm in enumerate(vms): + count = tests_per_vm + (1 if i < remaining else 0) + end_idx = start_idx + count + distribution[vm] = test_cases[start_idx:end_idx] + start_idx = end_idx + + return distribution + + +def create_job_spec(vm_name, benchmark_id, test_entries, bucket, artifacts_bucket, iterations, mode="single-config"): + """Create job specification for a VM""" + total_tests = len(test_entries) + + job_spec = { + "vm_name": vm_name, + "benchmark_id": benchmark_id, + "bucket": bucket, + "artifacts_bucket": artifacts_bucket, + "iterations": iterations, + "total_tests": total_tests, + "total_runs": total_tests * iterations, + "test_ids" if mode == "single-config" else "test_entries": + [entry['test_id'] for entry in test_entries] if mode == "single-config" else test_entries + } + + return job_spec diff --git a/distributed-micro-benchmarking/helpers/report_generator.py b/distributed-micro-benchmarking/helpers/report_generator.py new file mode 100644 index 0000000..e114f94 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/report_generator.py @@ -0,0 +1,192 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Report generation for benchmark results""" + +import os +import csv +from tabulate import tabulate + + +def _extract_resource_metrics(params): + """Extract resource metrics from params dict""" + return { + 'avg_cpu': params.get('avg_cpu', '-'), + 'peak_cpu': params.get('peak_cpu', '-'), + 'avg_mem': params.get('avg_mem_mb', '-'), + 'peak_mem': params.get('peak_mem_mb', '-'), + 'avg_page_cache': params.get('avg_page_cache_gb', '-'), + 'peak_page_cache': params.get('peak_page_cache_gb', '-'), + 'avg_sys_cpu': params.get('avg_sys_cpu', '-'), + 'peak_sys_cpu': params.get('peak_sys_cpu', '-'), + 'avg_net_rx': params.get('avg_net_rx_mbps', '-'), + 'peak_net_rx': params.get('peak_net_rx_mbps', '-'), + 'avg_net_tx': params.get('avg_net_tx_mbps', '-'), + 'peak_net_tx': params.get('peak_net_tx_mbps', '-'), + } + + +def _format_metric(value, default="-"): + """Format a metric value with 2 decimal places or return default""" + return f"{value:.2f}" if value > 0 else default + + +def generate_report(metrics, output_file, mode="single-config", separate_configs=False): + """Generate benchmark report in CSV format and print as table""" + os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else '.', exist_ok=True) + + if mode == "multi-config" and separate_configs: + # Generate separate reports per config + generate_separate_reports(metrics, output_file) + else: + # Generate combined report + generate_combined_report(metrics, output_file, mode) + + +def generate_combined_report(metrics, output_file, mode): + """Generate a combined report with optional config columns""" + + # Determine headers based on mode + base_headers = ["IOType|Jobs|FSize|BS|IOD|NrFiles", "Read BW (MB/s)", "Write BW (MB/s)", + "Read Min (ms)", "Read Max (ms)", "Read Avg (ms)", "Read StdDev (ms)", + "Read P50 (ms)", "Read P90 (ms)", "Read P99 (ms)", + "Avg CPU (%)", "Peak CPU (%)", "Avg Mem (MB)", "Peak Mem (MB)", + "Avg PgCache (GB)", "Peak PgCache (GB)", "Avg Sys CPU (%)", "Peak Sys CPU (%)", + "Avg Net RX (MB/s)", "Peak Net RX (MB/s)", "Avg Net TX (MB/s)", "Peak Net TX (MB/s)", "Iter"] + + if mode == "multi-config": + headers = ["Matrix ID", "Test ID", "Config", "Commit"] + base_headers + else: + headers = ["Test ID"] + base_headers + + rows = [] + for test_key in sorted(metrics.keys()): + m = metrics[test_key] + params = m.get('test_params', {}) + resources = _extract_resource_metrics(params) + + # Build metric values + metric_values = [ + format_params(params), + _format_metric(m['read_bw_mbps']), + _format_metric(m['write_bw_mbps']), + _format_metric(m.get('read_lat_min_ms', 0)), + _format_metric(m.get('read_lat_max_ms', 0)), + _format_metric(m.get('read_lat_avg_ms', 0)), + _format_metric(m.get('read_lat_stddev_ms', 0)), + _format_metric(m.get('read_lat_p50_ms', 0)), + _format_metric(m.get('read_lat_p90_ms', 0)), + _format_metric(m.get('read_lat_p99_ms', 0)), + resources['avg_cpu'], resources['peak_cpu'], + resources['avg_mem'], resources['peak_mem'], + resources['avg_page_cache'], resources['peak_page_cache'], + resources['avg_sys_cpu'], resources['peak_sys_cpu'], + resources['avg_net_rx'], resources['peak_net_rx'], + resources['avg_net_tx'], resources['peak_net_tx'], + m['iterations'] + ] + + if mode == "multi-config": + config_label = params.get('config_label', '-') + commit = params.get('commit', '-') + matrix_id = m.get('matrix_id', test_key) + test_id = m.get('test_id') or params.get('test_id', '-') + rows.append([matrix_id, test_id, config_label, commit] + metric_values) + else: + rows.append([test_key] + metric_values) + + # Write to CSV file + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(headers) + writer.writerows(rows) + + # Print table to console + table = tabulate(rows, headers=headers, tablefmt="grid") + print("\n" + table) + print(f"\nReport saved to: {output_file}") + + +def generate_separate_reports(metrics, base_output_file): + """Generate separate CSV reports per config""" + + # Group metrics by config + config_groups = {} + for test_key, m in metrics.items(): + params = m.get('test_params', {}) + config_label = params.get('config_label', 'unknown') + if config_label not in config_groups: + config_groups[config_label] = {} + config_groups[config_label][test_key] = m + + # Generate report for each config + base_dir = os.path.dirname(base_output_file) + base_name = os.path.splitext(os.path.basename(base_output_file))[0] + + headers = ["Test ID", "BS|FSize|IOD|IOType|Jobs|NrFiles", "Read BW (MB/s)", "Write BW (MB/s)", + "Read P50 (ms)", "Read P90 (ms)", "Read P99 (ms)", "Read Max (ms)", + "Avg CPU (%)", "Peak CPU (%)", "Avg Mem (MB)", "Peak Mem (MB)", + "Avg PgCache (GB)", "Peak PgCache (GB)", "Avg Sys CPU (%)", "Peak Sys CPU (%)", + "Avg Net RX (MB/s)", "Peak Net RX (MB/s)", "Avg Net TX (MB/s)", "Peak Net TX (MB/s)", "Iter"] + + for config_label, config_metrics in config_groups.items(): + output_file = os.path.join(base_dir, f"{base_name}_{config_label}.csv") + rows = [] + + for test_key in sorted(config_metrics.keys()): + m = config_metrics[test_key] + params = m.get('test_params', {}) + resources = _extract_resource_metrics(params) + test_id = params.get('test_id', test_key) + + rows.append([ + test_id, + format_params(params), + _format_metric(m['read_bw_mbps']), + _format_metric(m['write_bw_mbps']), + _format_metric(m.get('read_lat_p50_ms', 0)), + _format_metric(m.get('read_lat_p90_ms', 0)), + _format_metric(m.get('read_lat_p99_ms', 0)), + _format_metric(m.get('read_lat_max_ms', 0)), + resources['avg_cpu'], resources['peak_cpu'], + resources['avg_mem'], resources['peak_mem'], + resources['avg_page_cache'], resources['peak_page_cache'], + resources['avg_sys_cpu'], resources['peak_sys_cpu'], + resources['avg_net_rx'], resources['peak_net_rx'], + resources['avg_net_tx'], resources['peak_net_tx'], + m['iterations'] + ]) + + # Write to CSV file + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(headers) + writer.writerows(rows) + + print(f"Report for config '{config_label}' saved to: {output_file}") + + +def format_params(params): + """Format test parameters into compact string""" + if not params: + return "-" + + # Extract common FIO parameters (excluding resource metrics and config info) + # Order: io_type, threads, file_size, block_size, io_depth, nr_files + parts = [] + for key in ['io_type', 'threads', 'file_size', 'bs', 'io_depth', 'nrfiles']: + if key in params: + parts.append(f"{params[key]}") + + return "|".join(parts) if parts else str(params) diff --git a/distributed-micro-benchmarking/helpers/result_aggregator.py b/distributed-micro-benchmarking/helpers/result_aggregator.py new file mode 100644 index 0000000..327fa5e --- /dev/null +++ b/distributed-micro-benchmarking/helpers/result_aggregator.py @@ -0,0 +1,176 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Result aggregation from distributed VMs""" + +import json +import glob +import tempfile +import os +from . import gcs, gcloud_utils + + +def _avg(values): + """Calculate average of non-empty list, return 0 if empty""" + return sum(values) / len(values) if values else 0 + + +def _extract_latency_metrics(job): + """Extract read latency metrics from FIO job data""" + if 'read' not in job or not job['read'].get('bw'): + return None + + lat_metrics = {'bw': job['read']['bw']} + + # clat_ns stores values in MICROSECONDS (despite the name) + # lat_ns stores values in NANOSECONDS + # We prefer clat_ns (completion latency) over lat_ns (total latency) + if 'clat_ns' in job['read']: + lat_data = job['read']['clat_ns'] + divisor = 1000.0 # µs to ms + elif 'lat_ns' in job['read']: + lat_data = job['read']['lat_ns'] + divisor = 1000000.0 # ns to ms + else: + return lat_metrics + + # Extract basic stats + for key in ['min', 'max', 'mean', 'stddev']: + if key in lat_data: + lat_metrics[key] = lat_data[key] / divisor + + # Extract percentiles + if 'percentile' in lat_data: + percentiles = lat_data['percentile'] + for pct, pct_key in [('50.000000', 'p50'), ('90.000000', 'p90'), ('99.000000', 'p99')]: + if pct in percentiles: + lat_metrics[pct_key] = percentiles[pct] / divisor + + return lat_metrics + """Aggregate results from all VMs. + + Downloads results from gs:////results// for each VM. + Each VM's results directory contains: + - manifest.json: List of tests with status and metadata + - test-/: Directory per test with FIO JSON outputs and resource metrics + + In multi-config mode, test_key is matrix_id (unique across config×test combinations). + In single-config mode, test_key is test_id (can be same across VMs if distributed). + + Returns dict mapping test_key -> aggregated metrics (bandwidth, CPU, memory, etc). + """ + all_metrics = {} + successful_vms = 0 + failed_vms = [] + + with tempfile.TemporaryDirectory() as tmpdir: + for vm in vms: + # Download VM results + vm_path = f"gs://{artifacts_bucket}/{benchmark_id}/results/{vm}" + local_vm_dir = os.path.join(tmpdir, vm) + os.makedirs(local_vm_dir, exist_ok=True) + + try: + # Download with wildcard to get contents + gcloud_utils.gcloud_storage_cp(f"{vm_path}/*", local_vm_dir, recursive=True, retries=1, check=True) + except Exception as e: + print(f"Warning: Could not download results for {vm}: {e}") + failed_vms.append(vm) + continue + + # Load manifest + manifest_path = os.path.join(local_vm_dir, "manifest.json") + if not os.path.exists(manifest_path): + print(f"Warning: No manifest found for {vm} at {manifest_path}") + # List what we got + print(f" Contents: {os.listdir(local_vm_dir) if os.path.exists(local_vm_dir) else 'directory does not exist'}") + continue + + with open(manifest_path, 'r') as f: + manifest = json.load(f) + + # Process each test result + for test_info in manifest.get('tests', []): + if test_info['status'] != 'success': + continue + + # In multi-config mode, use matrix_id as key; in single-config, use test_id + if mode == "multi-config": + test_key = test_info.get('matrix_id', test_info['test_id']) + test_dir_name = f"test-{test_key}" + else: + test_key = test_info['test_id'] + test_dir_name = f"test-{test_key}" + + # Parse FIO results for this test + test_dir = os.path.join(local_vm_dir, test_dir_name) + if os.path.exists(test_dir): + metrics = parse_test_results(test_dir, test_info, mode) + all_metrics[test_key] = metrics + successful_vms += 1 + + # Print summary + if failed_vms: + print(f"\nWarning: Failed to get results from {len(failed_vms)} VM(s): {', '.join(failed_vms)}") + print(f"Successfully aggregated results from {successful_vms}/{len(vms)} VMs") + + return all_metrics + + +def parse_test_results(test_dir, test_info, mode="single-config"): + """Parse FIO results from a test directory""" + fio_files = glob.glob(os.path.join(test_dir, "fio_output_*.json")) + + read_bws = [] + write_bws = [] + lat_lists = {key: [] for key in ['min', 'max', 'mean', 'stddev', 'p50', 'p90', 'p99']} + + for fio_file in fio_files: + with open(fio_file, 'r') as f: + data = json.load(f) + + for job in data.get('jobs', []): + # Extract read metrics + lat_metrics = _extract_latency_metrics(job) + if lat_metrics: + read_bws.append(lat_metrics['bw']) + for key in lat_lists: + if key in lat_metrics: + lat_lists[key].append(lat_metrics[key]) + + # Extract write metrics + if 'write' in job and job['write'].get('bw'): + write_bws.append(job['write']['bw']) + + # Build result dict + result = { + 'test_params': test_info.get('params', {}), + 'read_bw_mbps': _avg(read_bws) / 1000.0, + 'write_bw_mbps': _avg(write_bws) / 1000.0, + 'read_lat_min_ms': _avg(lat_lists['min']), + 'read_lat_max_ms': _avg(lat_lists['max']), + 'read_lat_avg_ms': _avg(lat_lists['mean']), + 'read_lat_stddev_ms': _avg(lat_lists['stddev']), + 'read_lat_p50_ms': _avg(lat_lists['p50']), + 'read_lat_p90_ms': _avg(lat_lists['p90']), + 'read_lat_p99_ms': _avg(lat_lists['p99']), + 'iterations': len(fio_files) + } + + # In multi-config mode, include matrix_id and test_id + if mode == "multi-config": + result['matrix_id'] = test_info.get('matrix_id', test_info['test_id']) + result['test_id'] = test_info.get('test_id') + + return result diff --git a/distributed-micro-benchmarking/helpers/test_gcloud_utils.py b/distributed-micro-benchmarking/helpers/test_gcloud_utils.py new file mode 100644 index 0000000..8916a33 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/test_gcloud_utils.py @@ -0,0 +1,229 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for gcloud_utils module""" + +import unittest +from unittest.mock import patch, MagicMock +import subprocess +import gcloud_utils + + +class TestRunGcloudCommand(unittest.TestCase): + """Test run_gcloud_command function""" + + @patch('gcloud_utils.subprocess.run') + def test_successful_command(self, mock_run): + """Test successful command execution""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "success" + mock_run.return_value = mock_result + + cmd = ['gcloud', 'storage', 'ls', 'gs://bucket/'] + result = gcloud_utils.run_gcloud_command(cmd, retries=1) + + self.assertEqual(result.returncode, 0) + self.assertEqual(result.stdout, "success") + mock_run.assert_called_once() + + @patch('gcloud_utils.subprocess.run') + @patch('gcloud_utils.time.sleep') + def test_retry_logic(self, mock_sleep, mock_run): + """Test retry logic on failures""" + mock_result_fail = MagicMock() + mock_result_fail.returncode = 1 + mock_result_fail.stderr = "error" + + mock_result_success = MagicMock() + mock_result_success.returncode = 0 + + # Fail first, then succeed + mock_run.side_effect = [mock_result_fail, mock_result_success] + + cmd = ['gcloud', 'storage', 'ls', 'gs://bucket/'] + result = gcloud_utils.run_gcloud_command(cmd, retries=2, retry_delay=1) + + self.assertEqual(result.returncode, 0) + self.assertEqual(mock_run.call_count, 2) + mock_sleep.assert_called_once_with(1) + + @patch('gcloud_utils.subprocess.run') + def test_check_raises_exception(self, mock_run): + """Test that check=True raises exception on failure""" + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stderr = "command failed" + mock_run.return_value = mock_result + + cmd = ['gcloud', 'storage', 'ls', 'gs://bucket/'] + + with self.assertRaises(Exception) as ctx: + gcloud_utils.run_gcloud_command(cmd, retries=1, check=True) + + self.assertIn("Command failed", str(ctx.exception)) + + +class TestGcloudStorageCp(unittest.TestCase): + """Test gcloud_storage_cp function""" + + @patch('gcloud_utils.run_gcloud_command') + def test_simple_copy(self, mock_run): + """Test simple file copy""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + result = gcloud_utils.gcloud_storage_cp('local.txt', 'gs://bucket/remote.txt') + + expected_cmd = ['gcloud', 'storage', 'cp', 'local.txt', 'gs://bucket/remote.txt'] + mock_run.assert_called_once_with(expected_cmd, retries=3, check=True) + self.assertEqual(result.returncode, 0) + + @patch('gcloud_utils.run_gcloud_command') + def test_recursive_copy(self, mock_run): + """Test recursive directory copy""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + gcloud_utils.gcloud_storage_cp('local_dir/', 'gs://bucket/remote_dir/', recursive=True) + + expected_cmd = ['gcloud', 'storage', 'cp', '-r', 'local_dir/', 'gs://bucket/remote_dir/'] + mock_run.assert_called_once_with(expected_cmd, retries=3, check=True) + + +class TestGcloudStorageLs(unittest.TestCase): + """Test gcloud_storage_ls function""" + + @patch('gcloud_utils.run_gcloud_command') + def test_list_objects(self, mock_run): + """Test listing GCS objects""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "gs://bucket/file1.txt\ngs://bucket/file2.txt\n" + mock_run.return_value = mock_result + + result = gcloud_utils.gcloud_storage_ls('gs://bucket/*') + + expected_cmd = ['gcloud', 'storage', 'ls', 'gs://bucket/*'] + mock_run.assert_called_once_with(expected_cmd, retries=1, check=False) + self.assertEqual(result.returncode, 0) + + +class TestGcloudComputeSsh(unittest.TestCase): + """Test gcloud_compute_ssh function""" + + @patch('gcloud_utils.run_gcloud_command') + def test_ssh_no_command(self, mock_run): + """Test SSH without command""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + gcloud_utils.gcloud_compute_ssh('vm-1', 'us-central1-a', 'my-project') + + expected_cmd = [ + 'gcloud', 'compute', 'ssh', 'vm-1', + '--zone=us-central1-a', + '--project=my-project', + '--internal-ip' + ] + mock_run.assert_called_once_with(expected_cmd, retries=1, check=True) + + @patch('gcloud_utils.run_gcloud_command') + def test_ssh_with_command(self, mock_run): + """Test SSH with command execution""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + gcloud_utils.gcloud_compute_ssh('vm-1', 'us-central1-a', 'my-project', command='ls -la') + + expected_cmd = [ + 'gcloud', 'compute', 'ssh', 'vm-1', + '--zone=us-central1-a', + '--project=my-project', + '--internal-ip', + '--command', 'ls -la' + ] + mock_run.assert_called_once() + actual_cmd = mock_run.call_args[0][0] + self.assertEqual(actual_cmd, expected_cmd) + + +class TestGcloudComputeScp(unittest.TestCase): + """Test gcloud_compute_scp function""" + + @patch('gcloud_utils.run_gcloud_command') + def test_scp_to_vm(self, mock_run): + """Test copying file to VM""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + gcloud_utils.gcloud_compute_scp('local.txt', 'vm-1:/tmp/remote.txt', 'us-central1-a', 'my-project') + + expected_cmd = [ + 'gcloud', 'compute', 'scp', 'local.txt', 'vm-1:/tmp/remote.txt', + '--zone=us-central1-a', + '--project=my-project', + '--internal-ip' + ] + mock_run.assert_called_once_with(expected_cmd, retries=1, check=True) + + +class TestGcloudComputeInstanceGroupList(unittest.TestCase): + """Test gcloud_compute_instance_group_list function""" + + @patch('gcloud_utils.run_gcloud_command') + def test_list_running_vms(self, mock_run): + """Test listing running VMs""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "vm-1\nvm-2\nvm-3\n" + mock_run.return_value = mock_result + + vms = gcloud_utils.gcloud_compute_instance_group_list('my-ig', 'us-central1-a', 'my-project') + + expected_cmd = [ + 'gcloud', 'compute', 'instance-groups', 'managed', 'list-instances', + 'my-ig', + '--zone=us-central1-a', + '--project=my-project', + '--filter=STATUS=RUNNING', + '--format=value(NAME)' + ] + mock_run.assert_called_once_with(expected_cmd, retries=1, check=True) + self.assertEqual(vms, ['vm-1', 'vm-2', 'vm-3']) + + @patch('gcloud_utils.run_gcloud_command') + def test_list_with_custom_filter(self, mock_run): + """Test listing VMs with custom status filter""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "vm-1\n" + mock_run.return_value = mock_result + + vms = gcloud_utils.gcloud_compute_instance_group_list( + 'my-ig', 'us-central1-a', 'my-project', filter_status='STOPPED' + ) + + call_args = mock_run.call_args[0][0] + self.assertIn('--filter=STATUS=STOPPED', call_args) + self.assertEqual(vms, ['vm-1']) + + +if __name__ == '__main__': + unittest.main() diff --git a/distributed-micro-benchmarking/helpers/test_gcs.py b/distributed-micro-benchmarking/helpers/test_gcs.py new file mode 100644 index 0000000..31977c4 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/test_gcs.py @@ -0,0 +1,119 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for gcs module""" + +import sys +import os +sys.path.insert(0, os.path.dirname(__file__)) + +import unittest +from unittest.mock import patch, MagicMock, mock_open +import json + +# Import as module to avoid relative import issues +import gcs + + +class TestGcsOperations(unittest.TestCase): + """Test GCS operations""" + + @patch('gcs.gcloud_utils.gcloud_storage_cp') + @patch('builtins.open', new_callable=mock_open) + @patch('gcs.tempfile.NamedTemporaryFile') + def test_upload_json(self, mock_temp, mock_file, mock_gcloud_cp): + """Test JSON upload to GCS""" + mock_temp_file = MagicMock() + mock_temp_file.name = '/tmp/test.json' + mock_temp_file.__enter__.return_value = mock_temp_file + mock_temp.return_value = mock_temp_file + + data = {'test': 'data', 'value': 123} + gcs.upload_json(data, 'gs://bucket/file.json') + + mock_gcloud_cp.assert_called_once_with('/tmp/test.json', 'gs://bucket/file.json', retries=3, check=True) + + @patch('gcs.gcloud_utils.gcloud_storage_cp') + @patch('builtins.open', mock_open(read_data='{"test": "data"}')) + @patch('gcs.tempfile.NamedTemporaryFile') + def test_download_json(self, mock_temp, mock_gcloud_cp): + """Test JSON download from GCS""" + mock_temp_file = MagicMock() + mock_temp_file.name = '/tmp/test.json' + mock_temp_file.__enter__.return_value = mock_temp_file + mock_temp.return_value = mock_temp_file + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_gcloud_cp.return_value = mock_result + + result = gcs.download_json('gs://bucket/file.json') + + self.assertEqual(result, {'test': 'data'}) + mock_gcloud_cp.assert_called_once() + + @patch('gcs.gcloud_utils.gcloud_storage_cp') + @patch('gcs.os.path.exists', return_value=True) + def test_upload_test_cases(self, mock_exists, mock_gcloud_cp): + """Test uploading test cases CSV""" + gcs.upload_test_cases('/local/test-cases.csv', 'gs://bucket/base') + + mock_gcloud_cp.assert_called_once_with( + '/local/test-cases.csv', + 'gs://bucket/base/test-cases.csv', + retries=1, + check=True + ) + + @patch('gcs.gcloud_utils.gcloud_storage_ls') + def test_list_manifests(self, mock_gcloud_ls): + """Test listing manifest files""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = ( + "gs://bucket/bench-123/results/vm-1/manifest.json\n" + "gs://bucket/bench-123/results/vm-2/manifest.json\n" + ) + mock_gcloud_ls.return_value = mock_result + + manifests = gcs.list_manifests('bench-123', 'bucket') + + self.assertEqual(len(manifests), 2) + self.assertIn('gs://bucket/bench-123/results/vm-1/manifest.json', manifests) + + @patch('gcs.gcloud_utils.gcloud_storage_ls') + def test_check_cancellation_exists(self, mock_gcloud_ls): + """Test checking for cancellation flag""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_gcloud_ls.return_value = mock_result + + result = gcs.check_cancellation('bench-123', 'bucket') + + self.assertTrue(result) + + @patch('gcs.gcloud_utils.gcloud_storage_ls') + def test_check_cancellation_not_exists(self, mock_gcloud_ls): + """Test checking for non-existent cancellation flag""" + mock_result = MagicMock() + mock_result.returncode = 1 + mock_gcloud_ls.return_value = mock_result + + result = gcs.check_cancellation('bench-123', 'bucket') + + self.assertFalse(result) + + +if __name__ == '__main__': + unittest.main() diff --git a/distributed-micro-benchmarking/helpers/test_job_generator.py b/distributed-micro-benchmarking/helpers/test_job_generator.py new file mode 100644 index 0000000..693474b --- /dev/null +++ b/distributed-micro-benchmarking/helpers/test_job_generator.py @@ -0,0 +1,191 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for job_generator module""" + +import unittest +from unittest.mock import mock_open, patch +import job_generator + + +class TestLoadCSVWithId(unittest.TestCase): + """Test _load_csv_with_id helper function""" + + @patch('builtins.open', mock_open(read_data='col1,col2\nval1,val2\nval3,val4\n')) + def test_load_csv_with_id(self): + """Test loading CSV and adding ID field""" + result = job_generator._load_csv_with_id('test.csv', 'my_id') + + self.assertEqual(len(result), 2) + self.assertEqual(result[0]['my_id'], 1) + self.assertEqual(result[1]['my_id'], 2) + self.assertEqual(result[0]['col1'], 'val1') + + +class TestLoadTestCases(unittest.TestCase): + """Test load_test_cases function""" + + @patch('builtins.open', mock_open(read_data='io_type,threads,file_size\nread,4,1GB\nwrite,8,2GB\n')) + def test_load_test_cases(self): + """Test loading test cases with auto-generated test_id""" + test_cases = job_generator.load_test_cases('test_cases.csv') + + self.assertEqual(len(test_cases), 2) + self.assertEqual(test_cases[0]['test_id'], 1) + self.assertEqual(test_cases[1]['test_id'], 2) + self.assertEqual(test_cases[0]['io_type'], 'read') + self.assertEqual(test_cases[1]['threads'], '8') + + +class TestLoadConfigs(unittest.TestCase): + """Test load_configs function""" + + @patch('builtins.open', mock_open(read_data='commit,mount_args,label\nabc123,--arg1,config1\ndef456,--arg2,config2\n')) + def test_load_configs(self): + """Test loading configs with auto-generated config_id""" + configs = job_generator.load_configs('configs.csv') + + self.assertEqual(len(configs), 2) + self.assertEqual(configs[0]['config_id'], 1) + self.assertEqual(configs[1]['config_id'], 2) + self.assertEqual(configs[0]['commit'], 'abc123') + self.assertEqual(configs[1]['label'], 'config2') + + +class TestGenerateTestMatrix(unittest.TestCase): + """Test generate_test_matrix function""" + + def test_generate_test_matrix(self): + """Test cartesian product generation""" + test_cases = [ + {'test_id': 1, 'io_type': 'read'}, + {'test_id': 2, 'io_type': 'write'} + ] + configs = [ + {'config_id': 1, 'commit': 'abc', 'mount_args': '--arg1', 'label': 'cfg1'}, + {'config_id': 2, 'commit': 'def', 'mount_args': '--arg2', 'label': 'cfg2'} + ] + + matrix = job_generator.generate_test_matrix(test_cases, configs) + + # Should be 2 configs × 2 tests = 4 entries + self.assertEqual(len(matrix), 4) + + # Check first entry + self.assertEqual(matrix[0]['matrix_id'], 1) + self.assertEqual(matrix[0]['config_id'], 1) + self.assertEqual(matrix[0]['test_id'], 1) + self.assertEqual(matrix[0]['commit'], 'abc') + self.assertEqual(matrix[0]['io_type'], 'read') + + # Check last entry + self.assertEqual(matrix[3]['matrix_id'], 4) + self.assertEqual(matrix[3]['config_id'], 2) + self.assertEqual(matrix[3]['test_id'], 2) + self.assertEqual(matrix[3]['commit'], 'def') + self.assertEqual(matrix[3]['io_type'], 'write') + + +class TestDistributeTests(unittest.TestCase): + """Test distribute_tests function""" + + def test_distribute_evenly(self): + """Test even distribution of tests across VMs""" + test_cases = [{'test_id': i} for i in range(1, 11)] # 10 tests + vms = ['vm-1', 'vm-2', 'vm-3', 'vm-4', 'vm-5'] # 5 VMs + + distribution = job_generator.distribute_tests(test_cases, vms) + + self.assertEqual(len(distribution), 5) + # Each VM should get 2 tests (10 / 5) + for vm in vms: + self.assertEqual(len(distribution[vm]), 2) + + def test_distribute_with_remainder(self): + """Test distribution when tests don't divide evenly""" + test_cases = [{'test_id': i} for i in range(1, 8)] # 7 tests + vms = ['vm-1', 'vm-2', 'vm-3'] # 3 VMs + + distribution = job_generator.distribute_tests(test_cases, vms) + + # First 1 VM gets 3 tests (7 % 3 = 1), others get 2 + self.assertEqual(len(distribution['vm-1']), 3) + self.assertEqual(len(distribution['vm-2']), 2) + self.assertEqual(len(distribution['vm-3']), 2) + + def test_distribute_all_tests_assigned(self): + """Test that all tests are assigned to VMs""" + test_cases = [{'test_id': i} for i in range(1, 13)] # 12 tests + vms = ['vm-1', 'vm-2', 'vm-3', 'vm-4'] # 4 VMs + + distribution = job_generator.distribute_tests(test_cases, vms) + + total_assigned = sum(len(tests) for tests in distribution.values()) + self.assertEqual(total_assigned, 12) + + +class TestCreateJobSpec(unittest.TestCase): + """Test create_job_spec function""" + + def test_single_config_mode(self): + """Test job spec generation in single-config mode""" + test_entries = [ + {'test_id': 1, 'io_type': 'read'}, + {'test_id': 2, 'io_type': 'write'} + ] + + job_spec = job_generator.create_job_spec( + vm_name='vm-1', + benchmark_id='bench-123', + test_entries=test_entries, + bucket='test-bucket', + artifacts_bucket='artifacts-bucket', + iterations=3, + mode='single-config' + ) + + self.assertEqual(job_spec['vm_name'], 'vm-1') + self.assertEqual(job_spec['benchmark_id'], 'bench-123') + self.assertEqual(job_spec['total_tests'], 2) + self.assertEqual(job_spec['total_runs'], 6) # 2 tests × 3 iterations + self.assertEqual(job_spec['iterations'], 3) + self.assertIn('test_ids', job_spec) + self.assertEqual(job_spec['test_ids'], [1, 2]) + + def test_multi_config_mode(self): + """Test job spec generation in multi-config mode""" + test_entries = [ + {'test_id': 1, 'matrix_id': 5, 'config_id': 1}, + {'test_id': 2, 'matrix_id': 6, 'config_id': 1} + ] + + job_spec = job_generator.create_job_spec( + vm_name='vm-1', + benchmark_id='bench-456', + test_entries=test_entries, + bucket='test-bucket', + artifacts_bucket='artifacts-bucket', + iterations=2, + mode='multi-config' + ) + + self.assertEqual(job_spec['total_tests'], 2) + self.assertEqual(job_spec['total_runs'], 4) # 2 tests × 2 iterations + self.assertIn('test_entries', job_spec) + self.assertNotIn('test_ids', job_spec) + self.assertEqual(len(job_spec['test_entries']), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/distributed-micro-benchmarking/helpers/test_report_generator.py b/distributed-micro-benchmarking/helpers/test_report_generator.py new file mode 100644 index 0000000..5748c32 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/test_report_generator.py @@ -0,0 +1,241 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for report_generator module""" + +import sys +import unittest +from unittest.mock import patch, mock_open, MagicMock + +# Mock tabulate module before importing report_generator +sys.modules['tabulate'] = MagicMock() + +import report_generator + + +class TestExtractResourceMetrics(unittest.TestCase): + """Test _extract_resource_metrics helper function""" + + def test_extract_all_metrics(self): + """Test extracting all resource metrics""" + params = { + 'avg_cpu': 45.2, + 'peak_cpu': 89.1, + 'avg_mem_mb': 1024, + 'peak_mem_mb': 2048, + 'avg_page_cache_gb': 5.5, + 'peak_page_cache_gb': 8.2, + 'avg_sys_cpu': 12.3, + 'peak_sys_cpu': 25.6, + 'avg_net_rx_mbps': 100.5, + 'peak_net_rx_mbps': 250.0, + 'avg_net_tx_mbps': 50.2, + 'peak_net_tx_mbps': 120.8 + } + + result = report_generator._extract_resource_metrics(params) + + self.assertEqual(result['avg_cpu'], 45.2) + self.assertEqual(result['peak_mem'], 2048) + self.assertEqual(result['avg_page_cache'], 5.5) + + def test_extract_missing_metrics(self): + """Test extracting metrics with missing values""" + params = {'avg_cpu': 50.0} + + result = report_generator._extract_resource_metrics(params) + + self.assertEqual(result['avg_cpu'], 50.0) + self.assertEqual(result['peak_cpu'], '-') + self.assertEqual(result['avg_mem'], '-') + + +class TestFormatMetric(unittest.TestCase): + """Test _format_metric helper function""" + + def test_format_positive_value(self): + """Test formatting positive metric value""" + result = report_generator._format_metric(123.456) + self.assertEqual(result, '123.46') + + def test_format_zero_value(self): + """Test formatting zero returns default""" + result = report_generator._format_metric(0) + self.assertEqual(result, '-') + + def test_format_negative_value(self): + """Test formatting negative value returns default""" + result = report_generator._format_metric(-5.0) + self.assertEqual(result, '-') + + def test_custom_default(self): + """Test custom default value""" + result = report_generator._format_metric(0, default='N/A') + self.assertEqual(result, 'N/A') + + +class TestFormatParams(unittest.TestCase): + """Test format_params function""" + + def test_format_all_params(self): + """Test formatting complete parameter set""" + params = { + 'io_type': 'read', + 'threads': '4', + 'file_size': '1GB', + 'bs': '1M', + 'io_depth': '64', + 'nrfiles': '100' + } + + result = report_generator.format_params(params) + self.assertEqual(result, 'read|4|1GB|1M|64|100') + + def test_format_partial_params(self): + """Test formatting with missing parameters""" + params = { + 'io_type': 'write', + 'threads': '8', + 'bs': '4M' + } + + result = report_generator.format_params(params) + self.assertEqual(result, 'write|8|4M') + + def test_format_empty_params(self): + """Test formatting empty parameters""" + result = report_generator.format_params({}) + self.assertEqual(result, '-') + + def test_format_none_params(self): + """Test formatting None parameters""" + result = report_generator.format_params(None) + self.assertEqual(result, '-') + + +class TestGenerateCombinedReport(unittest.TestCase): + """Test generate_combined_report function""" + + @patch('report_generator.tabulate') + @patch('builtins.open', new_callable=mock_open) + def test_single_config_report(self, mock_file, mock_tabulate): + """Test generating single-config report""" + metrics = { + 1: { + 'test_params': {'io_type': 'read', 'threads': '4'}, + 'read_bw_mbps': 100.5, + 'write_bw_mbps': 0, + 'read_lat_min_ms': 1.5, + 'read_lat_max_ms': 50.0, + 'read_lat_avg_ms': 10.2, + 'read_lat_stddev_ms': 5.1, + 'read_lat_p50_ms': 9.8, + 'read_lat_p90_ms': 20.5, + 'read_lat_p99_ms': 45.2, + 'iterations': 3 + } + } + + report_generator.generate_combined_report(metrics, 'output.csv', 'single-config') + + # Verify file was written + mock_file.assert_called_once_with('output.csv', 'w', newline='') + mock_tabulate.assert_called_once() + + @patch('report_generator.tabulate') + @patch('builtins.open', new_callable=mock_open) + def test_multi_config_report(self, mock_file, mock_tabulate): + """Test generating multi-config report""" + metrics = { + 5: { + 'matrix_id': 5, + 'test_id': 1, + 'test_params': { + 'config_label': 'cfg1', + 'commit': 'abc123', + 'io_type': 'read' + }, + 'read_bw_mbps': 150.0, + 'write_bw_mbps': 0, + 'iterations': 2 + } + } + + report_generator.generate_combined_report(metrics, 'output.csv', 'multi-config') + + mock_file.assert_called_once_with('output.csv', 'w', newline='') + mock_tabulate.assert_called_once() + + +class TestGenerateSeparateReports(unittest.TestCase): + """Test generate_separate_reports function""" + + @patch('builtins.open', new_callable=mock_open) + @patch('report_generator.os.path.dirname', return_value='/tmp') + @patch('report_generator.os.path.basename', return_value='report.csv') + def test_separate_reports_by_config(self, mock_basename, mock_dirname, mock_file): + """Test generating separate reports per config""" + metrics = { + 1: { + 'test_params': {'config_label': 'cfg1', 'io_type': 'read'}, + 'read_bw_mbps': 100.0, + 'write_bw_mbps': 50.0, + 'iterations': 2 + }, + 2: { + 'test_params': {'config_label': 'cfg2', 'io_type': 'write'}, + 'read_bw_mbps': 120.0, + 'write_bw_mbps': 60.0, + 'iterations': 2 + }, + 3: { + 'test_params': {'config_label': 'cfg1', 'io_type': 'write'}, + 'read_bw_mbps': 110.0, + 'write_bw_mbps': 55.0, + 'iterations': 2 + } + } + + report_generator.generate_separate_reports(metrics, '/tmp/report.csv') + + # Should create 2 files (cfg1 and cfg2) + self.assertEqual(mock_file.call_count, 2) + + +class TestGenerateReport(unittest.TestCase): + """Test main generate_report function""" + + @patch('report_generator.generate_combined_report') + @patch('report_generator.os.makedirs') + def test_generate_combined(self, mock_makedirs, mock_combined): + """Test routing to combined report""" + metrics = {1: {'test_params': {}, 'read_bw_mbps': 100}} + + report_generator.generate_report(metrics, 'output.csv', mode='single-config') + + mock_combined.assert_called_once_with(metrics, 'output.csv', 'single-config') + + @patch('report_generator.generate_separate_reports') + @patch('report_generator.os.makedirs') + def test_generate_separate(self, mock_makedirs, mock_separate): + """Test routing to separate reports""" + metrics = {1: {'test_params': {'config_label': 'cfg1'}, 'read_bw_mbps': 100}} + + report_generator.generate_report(metrics, 'output.csv', mode='multi-config', separate_configs=True) + + mock_separate.assert_called_once_with(metrics, 'output.csv') + + +if __name__ == '__main__': + unittest.main() diff --git a/distributed-micro-benchmarking/helpers/test_result_aggregator.py b/distributed-micro-benchmarking/helpers/test_result_aggregator.py new file mode 100644 index 0000000..6136330 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/test_result_aggregator.py @@ -0,0 +1,277 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for result_aggregator module""" + +import sys +import os +import unittest +from unittest.mock import patch, MagicMock, mock_open +import json + +# Mock dependencies before importing +sys.path.insert(0, os.path.dirname(__file__)) + +# Create mock modules +mock_gcs = MagicMock() +mock_gcloud_utils = MagicMock() +sys.modules['gcs'] = mock_gcs +sys.modules['gcloud_utils'] = mock_gcloud_utils + +# Patch the imports in result_aggregator before importing it +with patch.dict('sys.modules', {'gcs': mock_gcs, 'gcloud_utils': mock_gcloud_utils}): + # Temporarily replace relative imports with absolute + import importlib.util + spec = importlib.util.spec_from_file_location("result_aggregator", + os.path.join(os.path.dirname(__file__), "result_aggregator.py")) + result_aggregator = importlib.util.module_from_spec(spec) + + # Mock the relative imports + result_aggregator.gcs = mock_gcs + result_aggregator.gcloud_utils = mock_gcloud_utils + + # Execute the module + spec.loader.exec_module(result_aggregator) + + +class TestAvg(unittest.TestCase): + """Test _avg helper function""" + + def test_avg_normal_list(self): + """Test average of normal list""" + result = result_aggregator._avg([10, 20, 30, 40]) + self.assertEqual(result, 25.0) + + def test_avg_single_value(self): + """Test average of single value""" + result = result_aggregator._avg([42]) + self.assertEqual(result, 42.0) + + def test_avg_empty_list(self): + """Test average of empty list returns 0""" + result = result_aggregator._avg([]) + self.assertEqual(result, 0) + + def test_avg_floats(self): + """Test average with float values""" + result = result_aggregator._avg([1.5, 2.5, 3.0]) + self.assertAlmostEqual(result, 2.333, places=3) + + +class TestExtractLatencyMetrics(unittest.TestCase): + """Test _extract_latency_metrics helper function""" + + def test_extract_clat_ns_metrics(self): + """Test extracting latency from clat_ns (microseconds)""" + job = { + 'read': { + 'bw': 100000, + 'clat_ns': { + 'min': 1000, # 1000 µs = 1 ms + 'max': 50000, # 50000 µs = 50 ms + 'mean': 10000, # 10000 µs = 10 ms + 'stddev': 5000, # 5000 µs = 5 ms + 'percentile': { + '50.000000': 9000, # 9 ms + '90.000000': 20000, # 20 ms + '99.000000': 45000 # 45 ms + } + } + } + } + + result = result_aggregator._extract_latency_metrics(job) + + self.assertEqual(result['bw'], 100000) + self.assertAlmostEqual(result['min'], 1.0, places=2) + self.assertAlmostEqual(result['max'], 50.0, places=2) + self.assertAlmostEqual(result['mean'], 10.0, places=2) + self.assertAlmostEqual(result['stddev'], 5.0, places=2) + self.assertAlmostEqual(result['p50'], 9.0, places=2) + self.assertAlmostEqual(result['p90'], 20.0, places=2) + self.assertAlmostEqual(result['p99'], 45.0, places=2) + + def test_extract_lat_ns_metrics(self): + """Test extracting latency from lat_ns (nanoseconds)""" + job = { + 'read': { + 'bw': 100000, + 'lat_ns': { + 'min': 1000000, # 1,000,000 ns = 1 ms + 'max': 50000000, # 50,000,000 ns = 50 ms + 'mean': 10000000, # 10,000,000 ns = 10 ms + 'stddev': 5000000 # 5,000,000 ns = 5 ms + } + } + } + + result = result_aggregator._extract_latency_metrics(job) + + self.assertEqual(result['bw'], 100000) + self.assertAlmostEqual(result['min'], 1.0, places=2) + self.assertAlmostEqual(result['max'], 50.0, places=2) + self.assertAlmostEqual(result['mean'], 10.0, places=2) + self.assertAlmostEqual(result['stddev'], 5.0, places=2) + + def test_extract_no_read_data(self): + """Test extracting from job without read data""" + job = {'write': {'bw': 50000}} + + result = result_aggregator._extract_latency_metrics(job) + + self.assertIsNone(result) + + def test_extract_no_latency_data(self): + """Test extracting when only bandwidth is present""" + job = {'read': {'bw': 100000}} + + result = result_aggregator._extract_latency_metrics(job) + + self.assertEqual(result['bw'], 100000) + self.assertNotIn('min', result) + self.assertNotIn('max', result) + + +class TestParseTestResults(unittest.TestCase): + """Test parse_test_results function""" + + @patch('result_aggregator.glob.glob') + @patch('builtins.open', new_callable=mock_open) + def test_parse_single_config_results(self, mock_file, mock_glob): + """Test parsing results in single-config mode""" + mock_glob.return_value = ['/tmp/test-1/fio_output_1.json', '/tmp/test-1/fio_output_2.json'] + + fio_data = { + 'jobs': [{ + 'read': { + 'bw': 100000, + 'clat_ns': { + 'min': 1000, 'max': 50000, 'mean': 10000, 'stddev': 5000, + 'percentile': {'50.000000': 9000, '90.000000': 20000, '99.000000': 45000} + } + }, + 'write': {'bw': 50000} + }] + } + + mock_file.return_value.read.return_value = json.dumps(fio_data) + + test_info = { + 'test_id': 1, + 'status': 'success', + 'params': {'io_type': 'read', 'threads': '4'} + } + + result = result_aggregator.parse_test_results('/tmp/test-1', test_info, mode='single-config') + + self.assertEqual(result['test_params'], test_info['params']) + self.assertGreater(result['read_bw_mbps'], 0) + self.assertGreater(result['write_bw_mbps'], 0) + self.assertGreater(result['read_lat_min_ms'], 0) + self.assertEqual(result['iterations'], 2) + self.assertNotIn('matrix_id', result) + + @patch('result_aggregator.glob.glob') + @patch('builtins.open', new_callable=mock_open) + def test_parse_multi_config_results(self, mock_file, mock_glob): + """Test parsing results in multi-config mode""" + mock_glob.return_value = ['/tmp/test-5/fio_output_1.json'] + + fio_data = { + 'jobs': [{ + 'read': {'bw': 100000, 'clat_ns': {'min': 1000, 'max': 50000, 'mean': 10000}}, + 'write': {'bw': 50000} + }] + } + + mock_file.return_value.read.return_value = json.dumps(fio_data) + + test_info = { + 'test_id': 1, + 'matrix_id': 5, + 'status': 'success', + 'params': {'config_label': 'cfg1', 'io_type': 'read'} + } + + result = result_aggregator.parse_test_results('/tmp/test-5', test_info, mode='multi-config') + + self.assertEqual(result['matrix_id'], 5) + self.assertEqual(result['test_id'], 1) + self.assertEqual(result['iterations'], 1) + + @patch('result_aggregator.glob.glob') + def test_parse_no_fio_files(self, mock_glob): + """Test parsing when no FIO files found""" + mock_glob.return_value = [] + + test_info = {'test_id': 1, 'params': {}} + result = result_aggregator.parse_test_results('/tmp/test-1', test_info) + + self.assertEqual(result['read_bw_mbps'], 0) + self.assertEqual(result['write_bw_mbps'], 0) + self.assertEqual(result['iterations'], 0) + + +class TestAggregateResults(unittest.TestCase): + """Test aggregate_results function""" + + @patch('result_aggregator.gcloud_utils.gcloud_storage_cp') + @patch('result_aggregator.tempfile.TemporaryDirectory') + @patch('result_aggregator.os.path.exists') + @patch('result_aggregator.os.makedirs') + @patch('builtins.open', new_callable=mock_open) + @patch('result_aggregator.parse_test_results') + def test_aggregate_from_multiple_vms(self, mock_parse, mock_file, mock_makedirs, + mock_exists, mock_tmpdir, mock_gcloud_cp): + """Test aggregating results from multiple VMs""" + # Setup mocks + mock_tmpdir.return_value.__enter__.return_value = '/tmp/test' + mock_exists.return_value = True + + manifest = { + 'status': 'completed', + 'tests': [ + {'test_id': 1, 'status': 'success', 'params': {'io_type': 'read'}}, + {'test_id': 2, 'status': 'success', 'params': {'io_type': 'write'}} + ] + } + mock_file.return_value.read.return_value = json.dumps(manifest) + + mock_parse.side_effect = [ + {'test_params': {}, 'read_bw_mbps': 100, 'write_bw_mbps': 50, 'iterations': 2}, + {'test_params': {}, 'read_bw_mbps': 120, 'write_bw_mbps': 60, 'iterations': 2} + ] + + vms = ['vm-1', 'vm-2'] + result = result_aggregator.aggregate_results('bench-123', 'artifacts', vms, mode='single-config') + + # Should have 2 test results (one from each VM, but same test_id could be on both) + self.assertGreater(len(result), 0) + + @patch('result_aggregator.gcloud_utils.gcloud_storage_cp') + @patch('result_aggregator.tempfile.TemporaryDirectory') + def test_aggregate_handles_failures(self, mock_tmpdir, mock_gcloud_cp): + """Test aggregation handles VM download failures""" + mock_tmpdir.return_value.__enter__.return_value = '/tmp/test' + mock_gcloud_cp.side_effect = Exception('Download failed') + + vms = ['vm-1', 'vm-2'] + result = result_aggregator.aggregate_results('bench-123', 'artifacts', vms) + + # Should return empty dict when all downloads fail + self.assertEqual(len(result), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/distributed-micro-benchmarking/helpers/vm_manager.py b/distributed-micro-benchmarking/helpers/vm_manager.py new file mode 100644 index 0000000..dc87197 --- /dev/null +++ b/distributed-micro-benchmarking/helpers/vm_manager.py @@ -0,0 +1,177 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VM management operations""" + +import subprocess +import time +from datetime import datetime, timedelta +from . import gcs, gcloud_utils + + +def get_running_vms(instance_group, zone, project): + """Get list of RUNNING VMs from instance group""" + try: + return gcloud_utils.gcloud_compute_instance_group_list(instance_group, zone, project, filter_status='RUNNING') + except Exception as e: + raise Exception(f"Failed to list VMs in instance group '{instance_group}': {e}") + + +def run_worker_script(vm_name, zone, project, script_path, benchmark_id, artifacts_bucket): + """Execute worker script on VM via gcloud ssh""" + import os + + # Convert to absolute path + script_path = os.path.abspath(script_path) + + # Create command to upload and execute script + remote_script = f"/tmp/worker_{benchmark_id}.sh" + + # Upload script + try: + gcloud_utils.gcloud_compute_scp( + script_path, + f'{vm_name}:{remote_script}', + zone=zone, + project=project, + internal_ip=True, + check=True + ) + except Exception as e: + print(f"Failed to upload script to {vm_name}: {e}") + raise + + # Execute script with benchmark_id and artifacts_bucket as arguments + # Logs will be written to /tmp/worker_.log and uploaded to GCS by worker.sh + log_file = f"/tmp/worker_{benchmark_id}.log" + exec_command = f'nohup bash {remote_script} {benchmark_id} {artifacts_bucket} > {log_file} 2>&1 &' + + try: + gcloud_utils.gcloud_compute_ssh( + vm_name, + zone=zone, + project=project, + command=exec_command, + internal_ip=True, + check=True, + capture_output=True, + text=True + ) + except Exception as e: + print(f"Failed to execute script on {vm_name}: {e}") + raise + + +def fetch_worker_logs(vm_name, benchmark_id, artifacts_bucket, lines=50): + """Fetch and display worker logs from GCS""" + log_path = f"gs://{artifacts_bucket}/{benchmark_id}/logs/{vm_name}/worker.log" + + try: + cmd = ['gsutil', 'cat', log_path] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + + if result.returncode == 0: + log_lines = result.stdout.split('\n') + if lines and len(log_lines) > lines: + # Show first 10 and last (lines-10) lines + print('\n'.join(log_lines[:10])) + print(f"\n... [{len(log_lines) - lines} lines omitted] ...\n") + print('\n'.join(log_lines[-(lines-10):])) + else: + print(result.stdout) + else: + print(f"Log not yet available for {vm_name}") + except Exception as e: + print(f"Could not fetch logs for {vm_name}: {e}") + + +def wait_for_completion(vms, benchmark_id, artifacts_bucket, poll_interval=30, timeout=7200): + """Wait for all VMs to complete by monitoring manifests. + + Polls GCS for manifest.json files from each VM. Manifests indicate completion status: + - 'completed': VM finished all assigned tests successfully + - 'cancelled': User cancelled via cancel.py, partial results available + - 'failed': VM encountered errors, logs fetched automatically + + Returns True if all VMs completed/cancelled, False if any failed or timeout occurred. + """ + print(f"Waiting for {len(vms)} VMs to complete...") + + deadline = datetime.now() + timedelta(seconds=timeout) + completed_vms = set() + failed_vms = set() + + while datetime.now() < deadline: + # Check for manifests + for vm in vms: + if vm in completed_vms or vm in failed_vms: + continue + + manifest_path = f"gs://{artifacts_bucket}/{benchmark_id}/results/{vm}/manifest.json" + manifest = gcs.download_json(manifest_path) + + if manifest: + if manifest.get('status') == 'completed': + completed_vms.add(vm) + print(f" ✓ {vm} completed - {manifest.get('total_tests', 0)} tests") + elif manifest.get('status') == 'cancelled': + completed_vms.add(vm) + print(f" ⚠ {vm} cancelled - {len(manifest.get('tests', []))} tests completed") + elif manifest.get('status') == 'failed': + failed_vms.add(vm) + print(f" ✗ {vm} failed") + print(f"\nLogs from {vm}:") + print("=" * 80) + fetch_worker_logs(vm, benchmark_id, artifacts_bucket, lines=100) + print("=" * 80) + + # Check if done + if len(completed_vms) + len(failed_vms) == len(vms): + if failed_vms: + print(f"\nCompleted: {completed_vms}") + print(f"Failed: {failed_vms}") + return False + return True + + # Calculate in-progress VMs + in_progress_vms = set(vms) - completed_vms - failed_vms + + # Build status message + status_msg = f" Progress: {len(completed_vms)}/{len(vms)} completed" + if failed_vms: + status_msg += f", {len(failed_vms)} failed" + if in_progress_vms: + status_msg += f" | In-progress: {', '.join(sorted(in_progress_vms))}" + + print(status_msg) + time.sleep(poll_interval) + + # Timeout reached - trigger cancellation + print(f"\n⚠ Timeout reached after {timeout}s. Completed: {len(completed_vms)}/{len(vms)}") + + in_progress_vms = set(vms) - completed_vms - failed_vms + if in_progress_vms: + print(f"Triggering cancellation for in-progress VMs: {', '.join(sorted(in_progress_vms))}") + + # Create cancellation flag + cancel_path = f"gs://{artifacts_bucket}/{benchmark_id}/cancel" + cmd = ['gsutil', 'cp', '-', cancel_path] + subprocess.run(cmd, input=b'timeout', capture_output=True) + + print(f"Cancellation flag created. Waiting 30s for workers to detect and shutdown...") + time.sleep(30) + + print("Workers should have detected cancellation and stopped gracefully.") + + return False