Skip to content

Conversation

@raj-prince
Copy link
Collaborator

No description provided.

@raj-prince
Copy link
Collaborator Author

/gemini review

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @raj-prince, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new set of Python helper scripts aimed at simplifying and automating the process of running distributed micro-benchmarks on Google Cloud. The new utilities provide a structured approach to managing GCloud interactions, handling GCS data, generating and distributing test workloads, aggregating results, and monitoring the lifecycle of benchmark VMs, thereby enhancing the overall efficiency and reliability of distributed benchmarking efforts.

Highlights

  • New Distributed Benchmarking Helpers: Introduced a new distributed-micro-benchmarking/helpers directory containing a suite of Python scripts to streamline distributed benchmarking workflows on Google Cloud Platform.
  • GCloud Utilities: Added gcloud_utils.py for robust execution of gcloud commands, including storage operations (copy, list), compute instance SSH/SCP, and listing VMs in managed instance groups, with built-in retry logic.
  • Google Cloud Storage (GCS) Operations: Implemented gcs.py to provide higher-level abstractions for GCS interactions, such as uploading/downloading JSON data, test case CSVs, FIO job files, listing benchmark manifests, and checking for cancellation flags.
  • Job Generation and Distribution: Developed job_generator.py to handle the loading of test cases and configurations from CSV, generating a comprehensive test matrix (cartesian product), distributing tests evenly across available VMs, and creating VM-specific job specifications.
  • Result Aggregation and Reporting: Created result_aggregator.py to download and parse benchmark results from distributed VMs, extract FIO metrics (including detailed latency percentiles), and aggregate them. report_generator.py was added to format these aggregated results into human-readable tables and CSV reports, supporting both combined and per-configuration outputs.
  • VM Management and Monitoring: Included vm_manager.py for managing benchmark VMs, which covers identifying running instances, securely executing worker scripts via SSH, fetching remote logs from GCS, and a robust wait_for_completion mechanism with timeout and cancellation handling.
  • Comprehensive Unit Tests: Provided dedicated unit tests for all new helper modules (gcloud_utils, gcs, job_generator, report_generator, result_aggregator) to ensure reliability and correctness of the new functionalities.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces helper scripts for distributed benchmarking, covering gcloud utilities, GCS operations, job generation, report generation, result aggregation, and VM management. The code is generally well-structured and includes unit tests for several modules. However, there are several areas for improvement related to error handling, resource management, potential command injection vulnerabilities, and code clarity. Specifically, some exception types are too generic, temporary files are not always cleaned up, and there are instances of import os inside functions which should be moved to the top of the file. Additionally, the docstring for aggregate_results is misplaced, and some metric formatting logic could be more robust.

Comment on lines +61 to +72
"""Aggregate results from all VMs.

Downloads results from gs://<artifacts_bucket>/<benchmark_id>/results/<vm>/ for each VM.
Each VM's results directory contains:
- manifest.json: List of tests with status and metadata
- test-<id>/: Directory per test with FIO JSON outputs and resource metrics

In multi-config mode, test_key is matrix_id (unique across config×test combinations).
In single-config mode, test_key is test_id (can be same across VMs if distributed).

Returns dict mapping test_key -> aggregated metrics (bandwidth, CPU, memory, etc).
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block of text appears to be a docstring for the aggregate_results function, but it's incorrectly placed inside the _extract_latency_metrics function and commented out. Please move this docstring to the aggregate_results function to properly document its purpose and functionality.

Comment on lines +76 to +78
cmd.extend(['--command', command])

return run_gcloud_command(cmd, retries=1, check=check, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Passing the command directly into the exec_command string for gcloud compute ssh --command could be a command injection vulnerability if the command string originates from an untrusted source. While gcloud itself might offer some protection, it's a good practice to be cautious. If possible, ensure that the command argument is sanitized or that its source is always trusted. For subprocess.run, passing arguments as a list is the standard way to prevent this, but gcloud compute ssh --command expects a single string.


def upload_json(data, gcs_path):
"""Upload JSON data to GCS with retry on failure"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tempfile.NamedTemporaryFile is created with delete=False. This means the temporary file will not be automatically deleted after the with block exits. This can lead to an accumulation of temporary files on the disk, potentially causing disk space issues over time. Consider explicitly deleting the file using os.remove(f.name) after the try-except block, or set delete=True if the file is not needed after the with block.

Suggested change
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=True) as f:

# Execute script with benchmark_id and artifacts_bucket as arguments
# Logs will be written to /tmp/worker_<benchmark_id>.log and uploaded to GCS by worker.sh
log_file = f"/tmp/worker_{benchmark_id}.log"
exec_command = f'nohup bash {remote_script} {benchmark_id} {artifacts_bucket} > {log_file} 2>&1 &'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The exec_command string is constructed using f-string interpolation with benchmark_id and artifacts_bucket. If these variables were ever to contain malicious input (e.g., from an untrusted user), this could lead to a command injection vulnerability on the remote VM. While these values are likely controlled internally, it's a security best practice to sanitize any external input used in shell commands or use a method that passes arguments safely (e.g., shlex.quote if constructing a shell command, though gcloud compute ssh --command expects a single string).


def download_json(gcs_path):
"""Download and parse JSON from GCS"""
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=False) as f:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tempfile.NamedTemporaryFile is created with delete=False. This means the temporary file will not be automatically deleted after the with block exits. This can lead to an accumulation of temporary files on the disk, potentially causing disk space issues over time. Consider explicitly deleting the file using os.remove(f.name) after the with block, or set delete=True if the file is not needed after the with block.

Suggested change
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=False) as f:
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=True) as f:

Returns dict mapping test_key -> aggregated metrics (bandwidth, CPU, memory, etc).
"""
all_metrics = {}
successful_vms = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The successful_vms counter is incremented for each test result processed (line 121), not for each unique VM that successfully reported results. If a single VM runs multiple tests, this counter will be inflated. It should probably count the number of unique VMs from which results were successfully aggregated.


read_bws = []
write_bws = []
lat_lists = {key: [] for key in ['min', 'max', 'mean', 'stddev', 'p50', 'p90', 'p99']}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The lat_lists dictionary is initialized with keys for various latency metrics. However, _extract_latency_metrics might not always return all these keys (e.g., if percentile data is missing). When _avg is called on an empty list (e.g., lat_lists['p50'] if no percentile data was found), it will return 0. This might mask the fact that the data was actually missing, rather than being a true zero. Consider returning None or explicitly checking for empty lists before averaging to indicate missing data more clearly.


def run_worker_script(vm_name, zone, project, script_path, benchmark_id, artifacts_bucket):
"""Execute worker script on VM via gcloud ssh"""
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import os statement is placed inside the run_worker_script function. It's generally considered best practice in Python to place all imports at the top of the file, after the module docstring and any future imports. This improves readability and ensures that dependencies are clear at a glance.

import os

try:
gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
except Exception as e:
raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to gcloud_utils.py, raising a generic Exception here is not ideal. It makes it difficult for callers to distinguish between different types of upload failures. Consider raising a more specific exception, or a custom exception, that provides more context about the GCS upload failure.

Suggested change
raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
raise RuntimeError(f"Failed to upload to {gcs_path} after 3 attempts: {e}")


try:
cmd = ['gsutil', 'cat', log_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout for gsutil cat is hardcoded to 10 seconds. If the log file is very large or network conditions are poor, this timeout might be too short, leading to incomplete log fetching or frequent failures. Consider making this timeout configurable or dynamically adjusting it based on expected log size or network conditions.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive set of helper scripts for distributed micro-benchmarking, including utilities for interacting with gcloud/GCS, generating test jobs, aggregating results, and generating reports. The code is well-structured and includes a good suite of unit tests. My review focuses on improving correctness, resource management, and consistency. Key findings include a critical bug in the result aggregator, a resource leak from unclosed temporary files, and several opportunities to improve code clarity and adhere to Python best practices.

Comment on lines +61 to +72
"""Aggregate results from all VMs.

Downloads results from gs://<artifacts_bucket>/<benchmark_id>/results/<vm>/ for each VM.
Each VM's results directory contains:
- manifest.json: List of tests with status and metadata
- test-<id>/: Directory per test with FIO JSON outputs and resource metrics

In multi-config mode, test_key is matrix_id (unique across config×test combinations).
In single-config mode, test_key is test_id (can be same across VMs if distributed).

Returns dict mapping test_key -> aggregated metrics (bandwidth, CPU, memory, etc).
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a docstring here that seems to describe a function, but it's not attached to any function definition. The code that follows from line 73 onwards is at the module's top level, which will cause it to execute on import and fail due to undefined variables. This entire block of code should be wrapped in a function, likely def aggregate_results(benchmark_id, artifacts_bucket, vms, mode="single-config"): based on the docstring and usage in the tests. The code from line 73 to 128 needs to be indented accordingly.

Comment on lines +24 to +31
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
f.flush()

try:
gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
except Exception as e:
raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function upload_json creates a temporary file using tempfile.NamedTemporaryFile(delete=False) but never cleans it up. This will lead to an accumulation of temporary files on the filesystem, causing a resource leak. The same issue is present in download_json (lines 36-43). You should ensure the file is deleted in a finally block to guarantee cleanup even if errors occur.

Here is a recommended pattern:

# Note: `import os` is needed at the top of the file.
def upload_json(data, gcs_path):
    """Upload JSON data to GCS with retry on failure"""
    f = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
    try:
        with f:
            json.dump(data, f, indent=2)
        
        gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
    except Exception as e:
        raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
    finally:
        os.remove(f.name)

base_dir = os.path.dirname(base_output_file)
base_name = os.path.splitext(os.path.basename(base_output_file))[0]

headers = ["Test ID", "BS|FSize|IOD|IOType|Jobs|NrFiles", "Read BW (MB/s)", "Write BW (MB/s)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The order of parameters in the header for generate_separate_reports ("BS|FSize|IOD|IOType|Jobs|NrFiles") is inconsistent with the order produced by format_params and the header used in generate_combined_report ("IOType|Jobs|FSize|BS|IOD|NrFiles"). This will lead to mismatched columns and data in the generated CSV file, producing an incorrect report. The headers should be consistent across all generated reports.

Suggested change
headers = ["Test ID", "BS|FSize|IOD|IOType|Jobs|NrFiles", "Read BW (MB/s)", "Write BW (MB/s)",
headers = ["Test ID", "IOType|Jobs|FSize|BS|IOD|NrFiles", "Read BW (MB/s)", "Write BW (MB/s)",

Comment on lines +28 to +49
check: If True, raise CalledProcessError on non-zero exit code
capture_output: If True, capture stdout/stderr
text: If True, decode output as text
**kwargs: Additional arguments passed to subprocess.run()

Returns:
subprocess.CompletedProcess object

Raises:
Exception: If command fails after all retries and check=True
"""
for attempt in range(retries):
result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs)

if result.returncode == 0:
return result

if attempt < retries - 1:
time.sleep(retry_delay)

if check:
raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for the check parameter on line 28 states that CalledProcessError will be raised on failure, but the implementation on line 49 raises a generic Exception. This is inconsistent and loses valuable information that subprocess.CalledProcessError provides (like the return code and command). It's better to raise subprocess.CalledProcessError to be consistent with the subprocess module's behavior and the function's documentation.

Suggested change
check: If True, raise CalledProcessError on non-zero exit code
capture_output: If True, capture stdout/stderr
text: If True, decode output as text
**kwargs: Additional arguments passed to subprocess.run()
Returns:
subprocess.CompletedProcess object
Raises:
Exception: If command fails after all retries and check=True
"""
for attempt in range(retries):
result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs)
if result.returncode == 0:
return result
if attempt < retries - 1:
time.sleep(retry_delay)
if check:
raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")
raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr)

Comment on lines +48 to +58
import os
if not os.path.exists(csv_path):
raise FileNotFoundError(f"Test cases file not found: {csv_path}")

dest = f"{base_path}/test-cases.csv"
gcloud_utils.gcloud_storage_cp(csv_path, dest, retries=1, check=True)


def upload_fio_job_file(fio_path, base_path):
"""Upload FIO job template to GCS"""
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, not inside functions. This is a standard Python convention (PEP 8) that improves readability and makes dependencies clear at a glance. The import os statements in upload_test_cases and upload_fio_job_file should be moved to the top of gcs.py.

return test_matrix


def distribute_tests(test_cases, vms, is_matrix=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter is_matrix is defined but not used within the distribute_tests function. It should be removed to simplify the function signature and avoid confusion.

Suggested change
def distribute_tests(test_cases, vms, is_matrix=False):
def distribute_tests(test_cases, vms):

Comment on lines +86 to +98
job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
"test_ids" if mode == "single-config" else "test_entries":
[entry['test_id'] for entry in test_entries] if mode == "single-config" else test_entries
}

return job_spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The creation of the job_spec dictionary uses a complex conditional expression for both a key and its value. While functional, this is difficult to read and maintain. Refactoring this into a standard if/else block to populate the dictionary would make the code more explicit and easier to understand.

Suggested change
job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
"test_ids" if mode == "single-config" else "test_entries":
[entry['test_id'] for entry in test_entries] if mode == "single-config" else test_entries
}
return job_spec
job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
}
if mode == "single-config":
job_spec['test_ids'] = [entry['test_id'] for entry in test_entries]
else:
job_spec['test_entries'] = test_entries
return job_spec


def run_worker_script(vm_name, zone, project, script_path, benchmark_id, artifacts_bucket):
"""Execute worker script on VM via gcloud ssh"""
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, not inside functions. This is a standard Python convention (PEP 8) that improves readability and makes dependencies clear. The import os statement should be moved to the top of vm_manager.py.

Comment on lines +81 to +82
cmd = ['gsutil', 'cat', log_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The functions fetch_worker_logs and wait_for_completion (line 170) use subprocess.run to execute gsutil commands. For consistency with the rest of the codebase, you should use the run_gcloud_command helper from gcloud_utils.py. This would also provide benefits like retry logic if configured. This would improve maintainability by centralizing command execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant