Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@ spec:
- key: karpenter.sh/capacity-type
operator: In
values: ["on-demand"]
{% if KARPENTER_INSTANCE_TYPES %}
- key: karpenter.k8s.aws/instance-type
operator: In
values:
{% for instance_type in KARPENTER_INSTANCE_TYPES %}
- "{{ instance_type }}"
{% endfor %}
{% else %}
- key: karpenter.k8s.aws/instance-category
operator: In
values: ["c", "m", "r"]
values: ["c", "m", "r", "t"]
- key: karpenter.k8s.aws/instance-generation
operator: Gt
values: ["2"]
{% endif %}
nodeClassRef:
group: karpenter.k8s.aws
kind: EC2NodeClass
name: default
expireAfter: 720h # 30 * 24h = 720h
limits:
cpu: 1000
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
Expand Down
34 changes: 14 additions & 20 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
"""Sets additional spec attributes."""
bm_spec.always_call_cleanup = True
assert bm_spec.container_cluster
_EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster)
cluster = bm_spec.container_cluster
assert isinstance(cluster, kubernetes_cluster.KubernetesCluster)
_EnsureEksKarpenterGpuNodepool(cluster)


def _GetRolloutCreationTime(rollout_name: str) -> int:
Expand Down Expand Up @@ -378,28 +380,20 @@ def GetStatusConditionsForResourceType(
lastTransitionTime.
"""

jsonpath = (
r'{range .items[*]}'
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
r'{end}'
)
# Use full JSON output to avoid invalid JSON when manually building from
# jsonpath with many resources or on connection reset (truncated output).
stdout, _, _ = kubectl.RunKubectlCommand(
[
'get',
resource_type,
'-o',
'jsonpath=' + jsonpath,
],
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
# Output can be quite large, so we'll conditionally suppress it.
['get', resource_type, '-o', 'json'],
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
suppress_logging=NUM_PODS.value > 20,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice this is clever

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

)

# Convert output to valid json and parse it
stdout = stdout.rstrip('\t\n\r ,')
stdout = '{' + stdout + '}'
name_to_conditions = json.loads(stdout)
data = json.loads(stdout)
name_to_conditions = {}
for item in data.get('items', []):
name = item.get('metadata', {}).get('name')
conditions = item.get('status', {}).get('conditions')
if name is not None and conditions is not None:
name_to_conditions[name] = conditions

for key in resources_to_ignore:
name_to_conditions.pop(key, None)
Expand Down
96 changes: 70 additions & 26 deletions perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from collections import abc
import json
import logging
import math
import re
from typing import Any
from urllib import parse
Expand Down Expand Up @@ -704,9 +705,7 @@ def _Create(self):
}],
},
'iamIdentityMappings': [{
'arn': (
f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}'
),
'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}',
'username': 'system:node:{{EC2PrivateDNSName}}',
'groups': ['system:bootstrappers', 'system:nodes'],
}],
Expand Down Expand Up @@ -992,9 +991,29 @@ def _PostIngressNetworkingFixups(
'[PKB][EKS] Allowed ALB SG %s -> node SGs on port %s', alb_sg, port
)

@staticmethod
def _DefaultNodepoolInstanceTypes() -> list[str]:
"""EC2 types for default NodePool manifest (--eks_karpenter_nodepool_instance_types).

Empty list means the Jinja template keeps instance-category/generation rules.
"""
return [
t.strip()
for t in FLAGS.eks_karpenter_nodepool_instance_types
if t.strip()
]

def _PostCreate(self):
"""Performs post-creation steps for the cluster."""
super()._PostCreate()
# Karpenter controller resources: default 1/1Gi; scale up when node_scale target is set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just not specify anything & let Karpenter decide? Or is this indeed necessary? It seems clever but a little annoying / bad user experience by Karpenter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the resources for the Karpenter controller pod (the node where Karpenter itself runs). Karpenter doesn’t manage that node, so it can’t “decide” these values, we have to set them. For runs with ~10 nodes, 1/1Gi is sufficient; we only increase when node_scale is 500+ or 1000+.

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
if num_nodes is not None and num_nodes > 1000:
controller_cpu, controller_memory = 4, '16Gi'
elif num_nodes is not None and num_nodes >= 500:
controller_cpu, controller_memory = 2, '8Gi'
else:
controller_cpu, controller_memory = 1, '1Gi'
vm_util.IssueCommand([
'helm',
'upgrade',
Expand All @@ -1013,13 +1032,13 @@ def _PostCreate(self):
'--set',
f'settings.interruptionQueue={self.name}',
'--set',
'controller.resources.requests.cpu=1',
f'controller.resources.requests.cpu={controller_cpu}',
'--set',
'controller.resources.requests.memory=1Gi',
f'controller.resources.requests.memory={controller_memory}',
'--set',
'controller.resources.limits.cpu=1',
f'controller.resources.limits.cpu={controller_cpu}',
'--set',
'controller.resources.limits.memory=1Gi',
f'controller.resources.limits.memory={controller_memory}',
'--set',
'logLevel=debug',
'--wait',
Expand Down Expand Up @@ -1057,10 +1076,16 @@ def _PostCreate(self):
'v'
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
)
# NodePool CPU limit: scale with benchmark target (nodes * vCPU + 5%), min 1000.
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
vcpu_per_node = FLAGS.eks_karpenter_limits_vcpu_per_node
cpu_limit = max(1000, math.ceil(num_nodes * vcpu_per_node * 1.05))
kubernetes_commands.ApplyManifest(
'container/karpenter/nodepool.yaml.j2',
CLUSTER_NAME=self.name,
ALIAS_VERSION=alias_version,
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
KARPENTER_INSTANCE_TYPES=self._DefaultNodepoolInstanceTypes(),
)

def _Delete(self):
Expand Down Expand Up @@ -1149,21 +1174,21 @@ def _CleanupKarpenter(self):
"""Cleanup Karpenter managed nodes before cluster deletion."""
logging.info('Cleaning up Karpenter nodes...')
# Delete NodePool resources - this will trigger node termination
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'delete',
'nodepool,ec2nodeclass',
'--all',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no resources found' in stderr.lower()
or 'not found' in stderr.lower()
or 'timed out waiting for the condition' in stderr.lower()
),
)
# Wait for all Karpenter nodes to be deleted
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'wait',
'--for=delete',
Expand All @@ -1172,9 +1197,10 @@ def _CleanupKarpenter(self):
'karpenter.sh/nodepool',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no matching resources found' in stderr.lower()
or 'timed out' in stderr.lower()
or 'no resources found' in stderr.lower()
),
)
# Force terminate remaining EC2 instances
Expand Down Expand Up @@ -1246,21 +1272,39 @@ def _CleanupKarpenter(self):
if eni_ids:
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
for eni_id in eni_ids:
vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
suppress_failure=lambda stdout, stderr, retcode: (
'not found' in stderr.lower()
or 'does not exist' in stderr.lower()
),
)
# Bind eni_id by default to avoid loop closure issues if this is refactored.
def _delete_one_eni(eni_id=eni_id) -> None:
_, stderr, retcode = vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
raise_on_failure=False,
)
if retcode == 0:
return
stderr_lower = (stderr or '').lower()
# ENI already deleted (e.g. by another process or previous attempt).
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
return
# RequestLimitExceeded (throttle): retry via vm_util.Retry.
if 'requestlimitexceeded' in stderr_lower:
raise errors.Resource.RetryableDeletionError(stderr or '')
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed: {stderr}'
)

# max_retries=5 yields 6 CLI attempts (tries > 5 on 6th failure).
vm_util.Retry(
poll_interval=10,
max_retries=5,
retryable_exceptions=(errors.Resource.RetryableDeletionError,),
)(_delete_one_eni)()

def _IsReady(self):
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""
Expand Down
13 changes: 13 additions & 0 deletions perfkitbenchmarker/providers/aws/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@
'Whether to install AWS Load Balancer Controller in EKS Karpenter clusters'
'Default value - do not install unless explicitly requested',
)
flags.DEFINE_integer(
'eks_karpenter_limits_vcpu_per_node',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use flagholder: https://absl.readthedocs.io/en/latest/absl.flags.html#absl.flags.FlagHolder

Also not sure if this is generally the right spot for these. Ideally probably both should go in config_overrides, with this one maybe being set cpu size from vm_spec & the other coming in a follow up cl.

2,
'Assumed vCPUs per provisioned node when computing Karpenter NodePool '
'limits.cpu on EKS (uses kubernetes_scale_num_nodes, this value, and 5% '
'headroom; minimum limit 1000). Raise for larger EC2 instance shapes.',
)
flags.DEFINE_list(
'eks_karpenter_nodepool_instance_types',
[],
'Comma-separated EC2 types for the Karpenter default NodePool (worker '
'nodes only). Empty keeps instance-category/generation in the template.',
)
AWS_CAPACITY_BLOCK_RESERVATION_ID = flags.DEFINE_string(
'aws_capacity_block_reservation_id',
None,
Expand Down
6 changes: 5 additions & 1 deletion perfkitbenchmarker/resources/container_service/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
'error sending request:',
'(abnormal closure): unexpected EOF',
'deadline exceeded',
# kubectl wait/delete timeouts and connection errors (retried in EKS cleanup)
'timed out',
'unable to connect to the server',
]


Expand All @@ -38,8 +41,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
# Check for kubectl timeout. If found, treat it the same as a regular
# timeout.
if retcode != 0:
stderr_lower = stderr.lower()
for error_substring in RETRYABLE_KUBECTL_ERRORS:
if error_substring in stderr:
if error_substring.lower() in stderr_lower:
# Raise timeout error regardless of raise_on_failure - as the intended
# semantics is to ignore expected errors caused by invoking the
# command not errors from PKB infrastructure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _DeleteAllFromDefaultNamespace():
run_cmd = ['delete', 'job', '--all', '-n', 'default']
kubectl.RunRetryableKubectlCommand(run_cmd)

timeout = 60 * 20
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
run_cmd = [
'delete',
'all',
Expand Down
35 changes: 22 additions & 13 deletions perfkitbenchmarker/traces/kubernetes_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ def _StopWatchingForNodeChanges(self):
"""Stop watching the cluster for node add/remove events."""
polled_events = self._cluster.GetEvents()

# Resolve machine type only for current nodes; use "unknown" for the rest.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O this makes sense. Was this causing the cluster to take a long time querying everything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it was the main reason.

_node_names = kubernetes_commands.GetNodeNames(suppress_logging=True)
_current_node_names = set(_node_names)
if _node_names:
_GetMachineTypeFromNodeName(self._cluster, _node_names[0])

for e in polled_events:
if e.resource.kind != "Node":
continue
Expand All @@ -156,10 +162,11 @@ def _StopWatchingForNodeChanges(self):
if name in self._nodes:
continue

machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
logging.info(
"DEBUG: RegisteredNode: %s, %s", name, machine_type
)
if name in _current_node_names:
machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
else:
machine_type = "unknown"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something around here is probably what is causing the TypeError.

Copy link
Collaborator Author

@kiryl-filatau kiryl-filatau Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks have passed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you use unknown.. I wonder if instead a random different machine's type would be better. likely in a big scaling scenario they'll all use the same one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will anyway unknown, as on the moment of gathering the info the nodes from scaleUP1 were already removed

logging.info("DEBUG: RegisteredNode: %s, %s", name, machine_type)
self._nodes[name] = _NodeTracker(
name=name,
machine_type=machine_type,
Expand All @@ -173,7 +180,9 @@ def _StopWatchingForNodeChanges(self):
"Detected a kubernetes event indicating that a node (%s) is"
" to be removed, but we have no record of this node. We'll"
" ignore this node - it won't be counted in the"
" %s metric.", name, VM_TIME_METRIC
" %s metric.",
name,
VM_TIME_METRIC,
)
continue

Expand Down Expand Up @@ -242,11 +251,11 @@ def _StartTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec):
if stage != stages.RUN:
return

k8s_cluster: kubernetes_cluster.KubernetesCluster = (
benchmark_spec.container_cluster
)
if k8s_cluster is None:
if not isinstance(
benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gonna say swap to raise instead of return, but the return if None seems quite reasonable.

):
return
k8s_cluster = benchmark_spec.container_cluster

global tracker
tracker = KubernetesResourceTracker(k8s_cluster)
Expand All @@ -264,11 +273,11 @@ def _StopTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec):
if stage != stages.RUN:
return

k8s_cluster: kubernetes_cluster.KubernetesCluster = (
benchmark_spec.container_cluster
)
if k8s_cluster is None:
if not isinstance(
benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster
):
return
k8s_cluster = benchmark_spec.container_cluster

if tracker is not None:
tracker.StopTracking()
Expand Down
Loading