Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
values: ["on-demand"]
- key: karpenter.k8s.aws/instance-category
operator: In
values: ["c", "m", "r"]
values: ["c", "m", "r", "t"]
- key: karpenter.k8s.aws/instance-generation
operator: Gt
values: ["2"]
Expand All @@ -27,7 +27,7 @@ spec:
name: default
expireAfter: 720h # 30 * 24h = 720h
limits:
cpu: 1000
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
Expand Down
30 changes: 11 additions & 19 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,28 +378,20 @@ def GetStatusConditionsForResourceType(
lastTransitionTime.
"""

jsonpath = (
r'{range .items[*]}'
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
r'{end}'
)
# Use full JSON output to avoid invalid JSON when manually building from
# jsonpath with many resources or on connection reset (truncated output).
stdout, _, _ = kubectl.RunKubectlCommand(
[
'get',
resource_type,
'-o',
'jsonpath=' + jsonpath,
],
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
# Output can be quite large, so we'll conditionally suppress it.
['get', resource_type, '-o', 'json'],
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
suppress_logging=NUM_PODS.value > 20,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice this is clever

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

)

# Convert output to valid json and parse it
stdout = stdout.rstrip('\t\n\r ,')
stdout = '{' + stdout + '}'
name_to_conditions = json.loads(stdout)
data = json.loads(stdout)
name_to_conditions = {}
for item in data.get('items', []):
name = item.get('metadata', {}).get('name')
conditions = item.get('status', {}).get('conditions')
if name is not None and conditions is not None:
name_to_conditions[name] = conditions

for key in resources_to_ignore:
name_to_conditions.pop(key, None)
Expand Down
89 changes: 67 additions & 22 deletions perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from collections import abc
import json
import logging
import math
import re
import time
from typing import Any
from urllib import parse

Expand Down Expand Up @@ -704,9 +706,7 @@ def _Create(self):
}],
},
'iamIdentityMappings': [{
'arn': (
f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}'
),
'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}',
'username': 'system:node:{{EC2PrivateDNSName}}',
'groups': ['system:bootstrappers', 'system:nodes'],
}],
Expand Down Expand Up @@ -995,6 +995,14 @@ def _PostIngressNetworkingFixups(
def _PostCreate(self):
"""Performs post-creation steps for the cluster."""
super()._PostCreate()
# Karpenter controller resources: default 1/1Gi; scale up when node_scale target is set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just not specify anything & let Karpenter decide? Or is this indeed necessary? It seems clever but a little annoying / bad user experience by Karpenter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the resources for the Karpenter controller pod (the node where Karpenter itself runs). Karpenter doesn’t manage that node, so it can’t “decide” these values, we have to set them. For runs with ~10 nodes, 1/1Gi is sufficient; we only increase when node_scale is 500+ or 1000+.

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
if num_nodes is not None and num_nodes > 1000:
controller_cpu, controller_memory = 4, '16Gi'
elif num_nodes is not None and num_nodes >= 500:
controller_cpu, controller_memory = 2, '8Gi'
else:
controller_cpu, controller_memory = 1, '1Gi'
vm_util.IssueCommand([
'helm',
'upgrade',
Expand All @@ -1013,13 +1021,13 @@ def _PostCreate(self):
'--set',
f'settings.interruptionQueue={self.name}',
'--set',
'controller.resources.requests.cpu=1',
f'controller.resources.requests.cpu={controller_cpu}',
'--set',
'controller.resources.requests.memory=1Gi',
f'controller.resources.requests.memory={controller_memory}',
'--set',
'controller.resources.limits.cpu=1',
f'controller.resources.limits.cpu={controller_cpu}',
'--set',
'controller.resources.limits.memory=1Gi',
f'controller.resources.limits.memory={controller_memory}',
'--set',
'logLevel=debug',
'--wait',
Expand Down Expand Up @@ -1057,10 +1065,14 @@ def _PostCreate(self):
'v'
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
)
# NodePool CPU limit: scale with benchmark target (nodes * 2 + 5%), min 1000.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the machine type matter here as well? If I am using a larger machine type, do I need to also set a larger cpu limit? This again seems a little annoying to have to set manually (but maybe makes senses given Karpenter can be machine type agnostic).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to include machine type adjustment, I’ll think about how to cover it.
Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the eks_karpenter_limits_vcpu_per_node flag so the Karpenter NodePool CPU limit can be tuned when nodes use more than 2 vCPUs. The default remains 2 (same behavior as before)

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
cpu_limit = max(1000, math.ceil(num_nodes * 2 * 1.05))
kubernetes_commands.ApplyManifest(
'container/karpenter/nodepool.yaml.j2',
CLUSTER_NAME=self.name,
ALIAS_VERSION=alias_version,
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
)

def _Delete(self):
Expand Down Expand Up @@ -1175,6 +1187,8 @@ def _CleanupKarpenter(self):
suppress_failure=lambda stdout, stderr, retcode: (
'no matching resources found' in stderr.lower()
or 'timed out' in stderr.lower()
or 'context deadline exceeded' in stderr.lower()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look very similar to the RETRYABLE_KUBECTL_ERRORS list:

Just use kubectl.RunRetryableKubectlCommmand instead & get these for free. If that code is missing some of these (like 'timed out') then consider adding. It looks suppress_failure is supported too, so you can mix both - which would probably be good for 'no matching resources found' as that sounds like a wait/this command specific error message to ignore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hubatish
Updated: EKS cleanup now uses RunRetryableKubectlCommand with suppress_failure only for "no resources found" style messages, retryable list extended and matching is case insensitive, please check.

or 'unable to connect to the server' in stderr.lower()
),
)
# Force terminate remaining EC2 instances
Expand Down Expand Up @@ -1246,21 +1260,52 @@ def _CleanupKarpenter(self):
if eni_ids:
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
for eni_id in eni_ids:
vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
suppress_failure=lambda stdout, stderr, retcode: (
'not found' in stderr.lower()
or 'does not exist' in stderr.lower()
),
)
max_retries = 5
backoff_seconds = 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this backoff logic looks pretty reasonable, prefer reusing backoff logic in vm_util.Retry. Which means moving this code to a subfunction & adding said decorator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hubatish

I have updated the code, please take look

for attempt in range(max_retries):
stdout, stderr, retcode = vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
# Observed in logs: InvalidNetworkInterfaceID.NotFound (ENI gone),
# RequestLimitExceeded (throttle). Suppress so we can retry or treat as success.
suppress_failure=lambda _stdout, stderr, _retcode: (
'invalidnetworkinterfaceid.notfound' in (stderr or '').lower()
or 'requestlimitexceeded' in (stderr or '').lower()
),
)
if retcode == 0:
break
stderr_lower = (stderr or '').lower()
# ENI already deleted (e.g. by another process or previous attempt).
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
break
# Throttle: retry with backoff.
if 'requestlimitexceeded' in stderr_lower:
if attempt < max_retries - 1:
logging.warning(
'AWS rate limit on DeleteNetworkInterface for %s, retry'
' in %ds',
eni_id,
backoff_seconds,
)
time.sleep(backoff_seconds)
backoff_seconds = min(backoff_seconds * 2, 60)
else:
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed after {max_retries} retries: '
f'{stderr}'
)
else:
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed: {stderr}'
)

def _IsReady(self):
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _DeleteAllFromDefaultNamespace():
run_cmd = ['delete', 'job', '--all', '-n', 'default']
kubectl.RunRetryableKubectlCommand(run_cmd)

timeout = 60 * 20
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
run_cmd = [
'delete',
'all',
Expand Down
19 changes: 14 additions & 5 deletions perfkitbenchmarker/traces/kubernetes_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ def _StopWatchingForNodeChanges(self):
"""Stop watching the cluster for node add/remove events."""
polled_events = self._cluster.GetEvents()

# Resolve machine type only for current nodes; use "unknown" for the rest.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O this makes sense. Was this causing the cluster to take a long time querying everything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it was the main reason.

_node_names = kubernetes_commands.GetNodeNames(suppress_logging=True)
_current_node_names = set(_node_names)
if _node_names:
_GetMachineTypeFromNodeName(self._cluster, _node_names[0])

for e in polled_events:
if e.resource.kind != "Node":
continue
Expand All @@ -156,10 +162,11 @@ def _StopWatchingForNodeChanges(self):
if name in self._nodes:
continue

machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
logging.info(
"DEBUG: RegisteredNode: %s, %s", name, machine_type
)
if name in _current_node_names:
machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
else:
machine_type = "unknown"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something around here is probably what is causing the TypeError.

Copy link
Collaborator Author

@kiryl-filatau kiryl-filatau Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks have passed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you use unknown.. I wonder if instead a random different machine's type would be better. likely in a big scaling scenario they'll all use the same one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will anyway unknown, as on the moment of gathering the info the nodes from scaleUP1 were already removed

logging.info("DEBUG: RegisteredNode: %s, %s", name, machine_type)
self._nodes[name] = _NodeTracker(
name=name,
machine_type=machine_type,
Expand All @@ -173,7 +180,9 @@ def _StopWatchingForNodeChanges(self):
"Detected a kubernetes event indicating that a node (%s) is"
" to be removed, but we have no record of this node. We'll"
" ignore this node - it won't be counted in the"
" %s metric.", name, VM_TIME_METRIC
" %s metric.",
name,
VM_TIME_METRIC,
)
continue

Expand Down
Loading