Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions distributed-micro-benchmarking/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Python cache
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
15 changes: 15 additions & 0 deletions distributed-micro-benchmarking/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Distributed micro-benchmarking helpers
103 changes: 103 additions & 0 deletions distributed-micro-benchmarking/helpers/gcloud_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unified gcloud command execution utilities"""

import subprocess
import time


def run_gcloud_command(cmd, retries=1, retry_delay=2, check=False, capture_output=True, text=True, **kwargs):
"""Execute a gcloud command with optional retry logic.

Args:
cmd: List of command components (e.g., ['gcloud', 'storage', 'cp', ...])
retries: Number of retry attempts (1 = no retry, 3 = try 3 times)
retry_delay: Seconds to wait between retries
check: If True, raise CalledProcessError on non-zero exit code
capture_output: If True, capture stdout/stderr
text: If True, decode output as text
**kwargs: Additional arguments passed to subprocess.run()

Returns:
subprocess.CompletedProcess object

Raises:
Exception: If command fails after all retries and check=True
"""
for attempt in range(retries):
result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs)

if result.returncode == 0:
return result

if attempt < retries - 1:
time.sleep(retry_delay)

if check:
raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Exception raised here is too generic. It's better to raise a more specific exception, such as subprocess.CalledProcessError (if check=True was passed to subprocess.run directly) or a custom exception, to allow callers to handle specific failure scenarios more gracefully. Raising a generic Exception makes it harder to differentiate between different types of errors.

Suggested change
raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")
raise RuntimeError(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")

Comment on lines +28 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for the check parameter on line 28 states that CalledProcessError will be raised on failure, but the implementation on line 49 raises a generic Exception. This is inconsistent and loses valuable information that subprocess.CalledProcessError provides (like the return code and command). It's better to raise subprocess.CalledProcessError to be consistent with the subprocess module's behavior and the function's documentation.

Suggested change
check: If True, raise CalledProcessError on non-zero exit code
capture_output: If True, capture stdout/stderr
text: If True, decode output as text
**kwargs: Additional arguments passed to subprocess.run()
Returns:
subprocess.CompletedProcess object
Raises:
Exception: If command fails after all retries and check=True
"""
for attempt in range(retries):
result = subprocess.run(cmd, capture_output=capture_output, text=text, **kwargs)
if result.returncode == 0:
return result
if attempt < retries - 1:
time.sleep(retry_delay)
if check:
raise Exception(f"Command failed after {retries} attempt(s): {' '.join(cmd)}\nError: {result.stderr}")
raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr)


return result


def gcloud_storage_cp(source, dest, recursive=False, retries=3, check=True):
"""Copy files to/from GCS"""
cmd = ['gcloud', 'storage', 'cp']
if recursive:
cmd.append('-r')
cmd.extend([source, dest])

return run_gcloud_command(cmd, retries=retries, check=check)


def gcloud_storage_ls(pattern, check=False):
"""List GCS objects matching a pattern"""
cmd = ['gcloud', 'storage', 'ls', pattern]
return run_gcloud_command(cmd, retries=1, check=check)


def gcloud_compute_ssh(vm_name, zone, project, command=None, internal_ip=True, check=True, **kwargs):
"""SSH to a compute instance"""
cmd = ['gcloud', 'compute', 'ssh', vm_name, f'--zone={zone}', f'--project={project}']
if internal_ip:
cmd.append('--internal-ip')
if command:
cmd.extend(['--command', command])

return run_gcloud_command(cmd, retries=1, check=check, **kwargs)
Comment on lines +76 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Passing the command directly into the exec_command string for gcloud compute ssh --command could be a command injection vulnerability if the command string originates from an untrusted source. While gcloud itself might offer some protection, it's a good practice to be cautious. If possible, ensure that the command argument is sanitized or that its source is always trusted. For subprocess.run, passing arguments as a list is the standard way to prevent this, but gcloud compute ssh --command expects a single string.



def gcloud_compute_scp(source, dest, zone, project, internal_ip=True, check=True):
"""Copy files to/from a compute instance"""
cmd = ['gcloud', 'compute', 'scp', source, dest, f'--zone={zone}', f'--project={project}']
if internal_ip:
cmd.append('--internal-ip')

return run_gcloud_command(cmd, retries=1, check=check)


def gcloud_compute_instance_group_list(instance_group, zone, project, filter_status='RUNNING'):
"""List VM names in a managed instance group"""
cmd = [
'gcloud', 'compute', 'instance-groups', 'managed', 'list-instances',
instance_group,
f'--zone={zone}',
f'--project={project}',
f'--filter=STATUS={filter_status}',
'--format=value(NAME)'
]

result = run_gcloud_command(cmd, retries=1, check=True)
vms = [vm.strip() for vm in result.stdout.strip().split('\n') if vm.strip()]
return vms
86 changes: 86 additions & 0 deletions distributed-micro-benchmarking/helpers/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""GCS operations for distributed benchmarking"""

import json
import tempfile
from . import gcloud_utils


def upload_json(data, gcs_path):
"""Upload JSON data to GCS with retry on failure"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tempfile.NamedTemporaryFile is created with delete=False. This means the temporary file will not be automatically deleted after the with block exits. This can lead to an accumulation of temporary files on the disk, potentially causing disk space issues over time. Consider explicitly deleting the file using os.remove(f.name) after the try-except block, or set delete=True if the file is not needed after the with block.

Suggested change
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=True) as f:

json.dump(data, f, indent=2)
f.flush()

try:
gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
except Exception as e:
raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to gcloud_utils.py, raising a generic Exception here is not ideal. It makes it difficult for callers to distinguish between different types of upload failures. Consider raising a more specific exception, or a custom exception, that provides more context about the GCS upload failure.

Suggested change
raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
raise RuntimeError(f"Failed to upload to {gcs_path} after 3 attempts: {e}")

Comment on lines +24 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function upload_json creates a temporary file using tempfile.NamedTemporaryFile(delete=False) but never cleans it up. This will lead to an accumulation of temporary files on the filesystem, causing a resource leak. The same issue is present in download_json (lines 36-43). You should ensure the file is deleted in a finally block to guarantee cleanup even if errors occur.

Here is a recommended pattern:

# Note: `import os` is needed at the top of the file.
def upload_json(data, gcs_path):
    """Upload JSON data to GCS with retry on failure"""
    f = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
    try:
        with f:
            json.dump(data, f, indent=2)
        
        gcloud_utils.gcloud_storage_cp(f.name, gcs_path, retries=3, check=True)
    except Exception as e:
        raise Exception(f"Failed to upload to {gcs_path} after 3 attempts: {e}")
    finally:
        os.remove(f.name)



def download_json(gcs_path):
"""Download and parse JSON from GCS"""
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=False) as f:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tempfile.NamedTemporaryFile is created with delete=False. This means the temporary file will not be automatically deleted after the with block exits. This can lead to an accumulation of temporary files on the disk, potentially causing disk space issues over time. Consider explicitly deleting the file using os.remove(f.name) after the with block, or set delete=True if the file is not needed after the with block.

Suggested change
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=False) as f:
with tempfile.NamedTemporaryFile(mode='r', suffix='.json', delete=True) as f:

result = gcloud_utils.gcloud_storage_cp(gcs_path, f.name, retries=1, check=False)

if result.returncode != 0:
return None

with open(f.name, 'r') as rf:
return json.load(rf)


def upload_test_cases(csv_path, base_path):
"""Upload test cases CSV to GCS"""
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import os statement is placed inside the upload_test_cases function. It's generally considered best practice in Python to place all imports at the top of the file, after the module docstring and any future imports. This improves readability and ensures that dependencies are clear at a glance.

import os

if not os.path.exists(csv_path):
raise FileNotFoundError(f"Test cases file not found: {csv_path}")

dest = f"{base_path}/test-cases.csv"
gcloud_utils.gcloud_storage_cp(csv_path, dest, retries=1, check=True)


def upload_fio_job_file(fio_path, base_path):
"""Upload FIO job template to GCS"""
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import os statement is placed inside the upload_fio_job_file function. It's generally considered best practice in Python to place all imports at the top of the file, after the module docstring and any future imports. This improves readability and ensures that dependencies are clear at a glance.

import os

Comment on lines +48 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, not inside functions. This is a standard Python convention (PEP 8) that improves readability and makes dependencies clear at a glance. The import os statements in upload_test_cases and upload_fio_job_file should be moved to the top of gcs.py.

if not os.path.exists(fio_path):
raise FileNotFoundError(f"FIO job file not found: {fio_path}")

dest = f"{base_path}/jobfile.fio"
gcloud_utils.gcloud_storage_cp(fio_path, dest, retries=1, check=True)


def list_manifests(benchmark_id, artifacts_bucket):
"""List all manifest files for a benchmark"""
pattern = f"gs://{artifacts_bucket}/{benchmark_id}/results/*/manifest.json"
result = gcloud_utils.gcloud_storage_ls(pattern, check=False)

if result.returncode != 0:
return []

return [line.strip() for line in result.stdout.strip().split('\n') if line.strip()]


def download_directory(gcs_path, local_path):
"""Download a directory from GCS"""
gcloud_utils.gcloud_storage_cp(gcs_path, local_path, recursive=True, retries=1, check=True)


def check_cancellation(benchmark_id, artifacts_bucket):
"""Check if cancellation flag exists in GCS"""
cancel_path = f"gs://{artifacts_bucket}/{benchmark_id}/cancel"
result = gcloud_utils.gcloud_storage_ls(cancel_path, check=False)
return result.returncode == 0
98 changes: 98 additions & 0 deletions distributed-micro-benchmarking/helpers/job_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Job specification generation and test distribution"""

import csv


def _load_csv_with_id(csv_path, id_field):
"""Generic CSV loader that adds sequential ID field"""
items = []
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, start=1):
row[id_field] = i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the input CSV already contains a column named id_field, its value will be silently overwritten by the sequential ID. While this might be the intended behavior, it could lead to unexpected data loss if the original id_field was important. Consider adding a check or a warning if id_field already exists in the row, or rename the generated ID field to avoid potential conflicts.

items.append(row)
return items


def load_test_cases(csv_path):
"""Load test cases from CSV file"""
return _load_csv_with_id(csv_path, 'test_id')


def load_configs(csv_path):
"""Load config variations from CSV file"""
return _load_csv_with_id(csv_path, 'config_id')


def generate_test_matrix(test_cases, configs):
"""Generate cartesian product of configs × test_cases"""
test_matrix = []
matrix_id = 1

for config in configs:
for test_case in test_cases:
# Spread test_case first, then override with matrix-specific IDs
matrix_entry = {
**test_case,
'matrix_id': matrix_id,
'config_id': config['config_id'],
'test_id': test_case['test_id'],
'commit': config['commit'],
'mount_args': config['mount_args'],
'config_label': config['label']
}
test_matrix.append(matrix_entry)
matrix_id += 1

return test_matrix


def distribute_tests(test_cases, vms, is_matrix=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The is_matrix parameter is defined in the function signature but is not used anywhere within the function body. This indicates dead code or an incomplete feature. Unused parameters can make the code harder to understand and maintain. If it's not needed, it should be removed. If it's for a future feature, consider adding a TODO comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter is_matrix is defined but not used within the distribute_tests function. It should be removed to simplify the function signature and avoid confusion.

Suggested change
def distribute_tests(test_cases, vms, is_matrix=False):
def distribute_tests(test_cases, vms):

"""Distribute test cases evenly across VMs"""
num_vms = len(vms)
tests_per_vm = len(test_cases) // num_vms
remaining = len(test_cases) % num_vms

distribution = {}
start_idx = 0

for i, vm in enumerate(vms):
count = tests_per_vm + (1 if i < remaining else 0)
end_idx = start_idx + count
distribution[vm] = test_cases[start_idx:end_idx]
start_idx = end_idx

return distribution


def create_job_spec(vm_name, benchmark_id, test_entries, bucket, artifacts_bucket, iterations, mode="single-config"):
"""Create job specification for a VM"""
total_tests = len(test_entries)

job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
"test_ids" if mode == "single-config" else "test_entries":
[entry['test_id'] for entry in test_entries] if mode == "single-config" else test_entries
}
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The job_spec structure dynamically changes its key ("test_ids" or "test_entries") based on the mode parameter. While functional, this can make downstream processing of job_spec more complex as consumers need to check the mode to know which key to access. A more consistent structure, perhaps always using "test_entries" and having test_id as a field within each entry, might simplify handling.


return job_spec
Comment on lines +86 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The creation of the job_spec dictionary uses a complex conditional expression for both a key and its value. While functional, this is difficult to read and maintain. Refactoring this into a standard if/else block to populate the dictionary would make the code more explicit and easier to understand.

Suggested change
job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
"test_ids" if mode == "single-config" else "test_entries":
[entry['test_id'] for entry in test_entries] if mode == "single-config" else test_entries
}
return job_spec
job_spec = {
"vm_name": vm_name,
"benchmark_id": benchmark_id,
"bucket": bucket,
"artifacts_bucket": artifacts_bucket,
"iterations": iterations,
"total_tests": total_tests,
"total_runs": total_tests * iterations,
}
if mode == "single-config":
job_spec['test_ids'] = [entry['test_id'] for entry in test_entries]
else:
job_spec['test_entries'] = test_entries
return job_spec

Loading