Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 135 additions & 17 deletions perfkitbenchmarker/benchmark_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,9 @@ def Provision(self):
self.data_discovery_service.Create()

def Delete(self):
if self.deleted:
# Don't trust the deleted flag during teardown-only runs
# This handles cases where pickle data is corrupted or provision failed
if self.deleted and stages.TEARDOWN not in FLAGS.run_stage:
return

should_freeze = (
Expand Down Expand Up @@ -1179,14 +1181,29 @@ def Delete(self):
for placement_group_object in self.placement_groups.values():
placement_group_object.Delete()

for firewall in self.firewalls.values():
try:
firewall.DisallowAllPorts()
except Exception:
logging.exception(
'Got an exception disabling firewalls. '
'Attempting to continue tearing down.'
)
# Validate network/firewall objects and fall back to direct cleanup if invalid
valid_networks = self._ValidateNetworkObjects()
valid_firewalls = self._ValidateFirewallObjects()

if not valid_networks or not valid_firewalls:
logging.warning(
'Invalid network/firewall objects detected (corrupted pickle data). '
'Falling back to direct GCP resource cleanup.'
)
self._CleanupOrphanedGCPResources()
else:
# Standard cleanup with validated objects
for firewall in self.firewalls.values():
try:
if hasattr(firewall, 'Delete') and callable(firewall.Delete):
firewall.Delete()
else:
logging.warning('Firewall object missing Delete method, skipping')
except Exception:
logging.exception(
'Got an exception deleting firewalls. '
'Attempting to continue tearing down.'
)

if self.container_cluster:
self.container_cluster.DeleteServices()
Expand All @@ -1196,14 +1213,18 @@ def Delete(self):
if self.cluster:
self.cluster.Delete()

for net in self.networks.values():
try:
net.Delete()
except Exception:
logging.exception(
'Got an exception deleting networks. '
'Attempting to continue tearing down.'
)
if valid_networks:
for net in self.networks.values():
try:
if hasattr(net, 'Delete') and callable(net.Delete):
net.Delete()
else:
logging.warning('Network object missing Delete method, skipping')
except Exception:
logging.exception(
'Got an exception deleting networks. '
'Attempting to continue tearing down.'
)

if hasattr(self, 'vpn_service') and self.vpn_service:
self.vpn_service.Delete()
Expand Down Expand Up @@ -1390,6 +1411,103 @@ def GetBenchmarkSpec(cls, benchmark_module, config, uid):
context.SetThreadBenchmarkSpec(bm_spec)
return bm_spec

def _ValidateNetworkObjects(self) -> bool:
"""Validate that network objects are actual network instances with Delete methods."""
if not self.networks:
return False

# Check for corrupted pickle data
for key, net in self.networks.items():
# Handle tuple keys (which indicate corruption)
if isinstance(key, tuple):
logging.warning(f'Network dict has tuple keys (corrupted): {key}')
return False

# Convert key to string for checking
key_str = str(key)

# Keys that look like JSON fragments indicate corruption
if key_str.strip() in ['{', '}', '[', ']', ','] or key_str.strip().startswith('"'):
logging.warning(f'Network dict has corrupted keys (JSON fragments): {key_str[:50]}')
return False

if not hasattr(net, 'Delete') or not callable(net.Delete):
logging.warning(f'Network object {key} is invalid (no Delete method)')
return False

return True

def _ValidateFirewallObjects(self) -> bool:
"""Validate that firewall objects are actual firewall instances with Delete methods."""
if not self.firewalls:
# Empty firewalls dict is valid (no firewalls created yet)
return True
for fw in self.firewalls.values():
if not hasattr(fw, 'Delete') or not callable(fw.Delete):
return False
return True

def _CleanupOrphanedGCPResources(self) -> None:
"""Directly query and delete GCP resources by run_uri pattern.

This method is called when network/firewall objects are invalid (corrupted pickle).
It bypasses the object-based deletion and uses gcloud commands directly.
"""
if FLAGS.cloud != provider_info.GCP:
logging.warning('Direct cleanup only supported for GCP')
return

import subprocess

project = FLAGS.project
if not project:
logging.error('No project specified for cleanup')
return

logging.info(f'Cleaning up orphaned GCP resources for run_uri: {FLAGS.run_uri}')

# Delete firewall rules first (dependency for networks)
try:
# List firewalls matching run_uri
result = subprocess.run(
['gcloud', 'compute', 'firewall-rules', 'list',
'--project', project,
'--filter', f'name~-{FLAGS.run_uri}',
'--format', 'value(name)'],
capture_output=True, text=True, check=False
)
if result.returncode == 0 and result.stdout.strip():
for firewall_name in result.stdout.strip().split('\n'):
logging.info(f'Deleting orphaned firewall: {firewall_name}')
subprocess.run(
['gcloud', 'compute', 'firewall-rules', 'delete', firewall_name,
'--project', project, '--quiet'],
check=False
)
except Exception as e:
logging.exception(f'Failed to clean up firewalls: {e}')

# Delete networks
try:
# List networks matching run_uri
result = subprocess.run(
['gcloud', 'compute', 'networks', 'list',
'--project', project,
'--filter', f'name~pkb-network.*{FLAGS.run_uri}',
'--format', 'value(name)'],
capture_output=True, text=True, check=False
)
if result.returncode == 0 and result.stdout.strip():
for network_name in result.stdout.strip().split('\n'):
logging.info(f'Deleting orphaned network: {network_name}')
subprocess.run(
['gcloud', 'compute', 'networks', 'delete', network_name,
'--project', project, '--quiet'],
check=False
)
except Exception as e:
logging.exception(f'Failed to clean up networks: {e}')

def CheckPrerequisites(self) -> None:
"""Checks preconditions for the benchmark_spec."""
for app_group in self.app_groups:
Expand Down