Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@ spec:
- key: karpenter.sh/capacity-type
operator: In
values: ["on-demand"]
{%- if KARPENTER_INSTANCE_TYPES %}
- key: node.kubernetes.io/instance-type
operator: In
values: [{% for instance_type in KARPENTER_INSTANCE_TYPES %}"{{ instance_type }}"{% if not loop.last %}, {% endif %}{% endfor %}]
{%- else %}
- key: karpenter.k8s.aws/instance-category
operator: In
values: ["c", "m", "r"]
values: ["c", "m", "r", "t"]
- key: karpenter.k8s.aws/instance-generation
operator: Gt
values: ["2"]
{%- endif %}
nodeClassRef:
group: karpenter.k8s.aws
kind: EC2NodeClass
name: default
expireAfter: 720h # 30 * 24h = 720h
limits:
cpu: 1000
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
Expand Down
41 changes: 20 additions & 21 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
"""Sets additional spec attributes."""
bm_spec.always_call_cleanup = True
assert bm_spec.container_cluster
_EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster)
cluster = bm_spec.container_cluster
assert isinstance(cluster, kubernetes_cluster.KubernetesCluster)
_EnsureEksKarpenterGpuNodepool(cluster)


def _GetScaleTimeout() -> int:
Expand Down Expand Up @@ -367,28 +369,25 @@ def GetStatusConditionsForResourceType(
lastTransitionTime.
"""

jsonpath = (
r'{range .items[*]}'
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
r'{end}'
)
# Use full JSON output to avoid invalid JSON when manually building from
# jsonpath with many resources or on connection reset (truncated output).
# Avoid logging huge JSON: kubernetes_scale uses num_replicas; kubernetes_node_scale
# uses kubernetes_scale_num_nodes for the same code path (get pod/node -o json).
stdout, _, _ = kubectl.RunKubectlCommand(
[
'get',
resource_type,
'-o',
'jsonpath=' + jsonpath,
],
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
# Output can be quite large, so we'll conditionally suppress it.
suppress_logging=NUM_PODS.value > 20,
['get', resource_type, '-o', 'json'],
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
suppress_logging=(
NUM_PODS.value > 20
or getattr(FLAGS, 'kubernetes_scale_num_nodes', 5) > 20
),
)

# Convert output to valid json and parse it
stdout = stdout.rstrip('\t\n\r ,')
stdout = '{' + stdout + '}'
name_to_conditions = json.loads(stdout)
data = json.loads(stdout)
name_to_conditions = {}
for item in data.get('items', []):
name = item.get('metadata', {}).get('name')
conditions = item.get('status', {}).get('conditions')
if name is not None and conditions is not None:
name_to_conditions[name] = conditions

for key in resources_to_ignore:
name_to_conditions.pop(key, None)
Expand Down
133 changes: 97 additions & 36 deletions perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from collections import abc
import json
import logging
import math
import re
from typing import Any
from urllib import parse
Expand Down Expand Up @@ -704,9 +705,7 @@ def _Create(self):
}],
},
'iamIdentityMappings': [{
'arn': (
f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}'
),
'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}',
'username': 'system:node:{{EC2PrivateDNSName}}',
'groups': ['system:bootstrappers', 'system:nodes'],
}],
Expand Down Expand Up @@ -992,9 +991,29 @@ def _PostIngressNetworkingFixups(
'[PKB][EKS] Allowed ALB SG %s -> node SGs on port %s', alb_sg, port
)

@staticmethod
def _DefaultNodepoolInstanceTypes() -> list[str]:
"""EC2 types for default NodePool manifest (--eks_karpenter_nodepool_instance_types).

Empty list means the Jinja template keeps instance-category/generation rules.
"""
return [
t.strip()
for t in FLAGS.eks_karpenter_nodepool_instance_types
if t.strip()
]

def _PostCreate(self):
"""Performs post-creation steps for the cluster."""
super()._PostCreate()
# Karpenter controller resources: default 1/1Gi; scale up when node_scale target is set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just not specify anything & let Karpenter decide? Or is this indeed necessary? It seems clever but a little annoying / bad user experience by Karpenter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the resources for the Karpenter controller pod (the node where Karpenter itself runs). Karpenter doesn’t manage that node, so it can’t “decide” these values, we have to set them. For runs with ~10 nodes, 1/1Gi is sufficient; we only increase when node_scale is 500+ or 1000+.

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
if num_nodes is not None and num_nodes > 1000:
controller_cpu, controller_memory = 4, '16Gi'
elif num_nodes is not None and num_nodes >= 500:
controller_cpu, controller_memory = 2, '8Gi'
else:
controller_cpu, controller_memory = 1, '1Gi'
vm_util.IssueCommand([
'helm',
'upgrade',
Expand All @@ -1013,13 +1032,13 @@ def _PostCreate(self):
'--set',
f'settings.interruptionQueue={self.name}',
'--set',
'controller.resources.requests.cpu=1',
f'controller.resources.requests.cpu={controller_cpu}',
'--set',
'controller.resources.requests.memory=1Gi',
f'controller.resources.requests.memory={controller_memory}',
'--set',
'controller.resources.limits.cpu=1',
f'controller.resources.limits.cpu={controller_cpu}',
'--set',
'controller.resources.limits.memory=1Gi',
f'controller.resources.limits.memory={controller_memory}',
'--set',
'logLevel=debug',
'--wait',
Expand Down Expand Up @@ -1057,10 +1076,16 @@ def _PostCreate(self):
'v'
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
)
# NodePool CPU limit: scale with benchmark target (nodes * vCPU + 5%), min 1000.
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
vcpu_per_node = FLAGS.eks_karpenter_limits_vcpu_per_node
cpu_limit = max(1000, math.ceil(num_nodes * vcpu_per_node * 1.05))
kubernetes_commands.ApplyManifest(
'container/karpenter/nodepool.yaml.j2',
CLUSTER_NAME=self.name,
ALIAS_VERSION=alias_version,
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
KARPENTER_INSTANCE_TYPES=self._DefaultNodepoolInstanceTypes(),
)

def _Delete(self):
Expand Down Expand Up @@ -1093,16 +1118,33 @@ def _DeleteDependencies(self):
# Start deleting the stack but likely to fail to delete this role.
vm_util.IssueCommand(delete_stack_cmd)
node_role = f'KarpenterNodeRole-{self.name}'
out, _, _ = vm_util.IssueCommand([
'aws',
'iam',
'list-instance-profiles-for-role',
'--role-name',
node_role,
'--region',
f'{self.region}',
])
profiles_json = json.loads(out)
out, _, retcode = vm_util.IssueCommand(
[
'aws',
'iam',
'list-instance-profiles-for-role',
'--role-name',
node_role,
'--region',
f'{self.region}',
],
suppress_failure=lambda stdout, stderr, rc: (
rc != 0
and (
'nosuchentity' in (stderr or '').lower()
or 'cannot be found' in (stderr or '').lower()
)
),
)
if retcode == 0 and out.strip():
profiles_json = json.loads(out)
else:
logging.info(
'Karpenter node role %s not found or empty response; skipping'
' instance profile cleanup',
node_role,
)
profiles_json = {'InstanceProfiles': []}
for profile in profiles_json.get('InstanceProfiles', []):
profile_name = profile['InstanceProfileName']
vm_util.IssueCommand([
Expand Down Expand Up @@ -1149,21 +1191,21 @@ def _CleanupKarpenter(self):
"""Cleanup Karpenter managed nodes before cluster deletion."""
logging.info('Cleaning up Karpenter nodes...')
# Delete NodePool resources - this will trigger node termination
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'delete',
'nodepool,ec2nodeclass',
'--all',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no resources found' in stderr.lower()
or 'not found' in stderr.lower()
or 'timed out waiting for the condition' in stderr.lower()
),
)
# Wait for all Karpenter nodes to be deleted
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'wait',
'--for=delete',
Expand All @@ -1172,9 +1214,10 @@ def _CleanupKarpenter(self):
'karpenter.sh/nodepool',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no matching resources found' in stderr.lower()
or 'timed out' in stderr.lower()
or 'no resources found' in stderr.lower()
),
)
# Force terminate remaining EC2 instances
Expand Down Expand Up @@ -1246,21 +1289,39 @@ def _CleanupKarpenter(self):
if eni_ids:
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
for eni_id in eni_ids:
vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
suppress_failure=lambda stdout, stderr, retcode: (
'not found' in stderr.lower()
or 'does not exist' in stderr.lower()
),
)
# Bind eni_id by default to avoid loop closure issues if this is refactored.
def _delete_one_eni(eni_id=eni_id) -> None:
_, stderr, retcode = vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
raise_on_failure=False,
)
if retcode == 0:
return
stderr_lower = (stderr or '').lower()
# ENI already deleted (e.g. by another process or previous attempt).
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
return
# RequestLimitExceeded (throttle): retry via vm_util.Retry.
if 'requestlimitexceeded' in stderr_lower:
raise errors.Resource.RetryableDeletionError(stderr or '')
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed: {stderr}'
)

# max_retries=5 yields 6 CLI attempts (tries > 5 on 6th failure).
vm_util.Retry(
poll_interval=10,
max_retries=5,
retryable_exceptions=(errors.Resource.RetryableDeletionError,),
)(_delete_one_eni)()

def _IsReady(self):
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""
Expand Down
13 changes: 13 additions & 0 deletions perfkitbenchmarker/providers/aws/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@
'Whether to install AWS Load Balancer Controller in EKS Karpenter clusters'
'Default value - do not install unless explicitly requested',
)
flags.DEFINE_integer(
'eks_karpenter_limits_vcpu_per_node',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use flagholder: https://absl.readthedocs.io/en/latest/absl.flags.html#absl.flags.FlagHolder

Also not sure if this is generally the right spot for these. Ideally probably both should go in config_overrides, with this one maybe being set cpu size from vm_spec & the other coming in a follow up cl.

2,
'Assumed vCPUs per provisioned node when computing Karpenter NodePool '
'limits.cpu on EKS (uses kubernetes_scale_num_nodes, this value, and 5% '
'headroom; minimum limit 1000). Raise for larger EC2 instance shapes.',
)
flags.DEFINE_list(
'eks_karpenter_nodepool_instance_types',
[],
'Comma-separated EC2 types for the Karpenter default NodePool (worker '
'nodes only). Empty keeps instance-category/generation in the template.',
)
AWS_CAPACITY_BLOCK_RESERVATION_ID = flags.DEFINE_string(
'aws_capacity_block_reservation_id',
None,
Expand Down
6 changes: 5 additions & 1 deletion perfkitbenchmarker/resources/container_service/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
'error sending request:',
'(abnormal closure): unexpected EOF',
'deadline exceeded',
# kubectl wait/delete timeouts and connection errors (retried in EKS cleanup)
'timed out',
'unable to connect to the server',
]


Expand All @@ -38,8 +41,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
# Check for kubectl timeout. If found, treat it the same as a regular
# timeout.
if retcode != 0:
stderr_lower = stderr.lower()
for error_substring in RETRYABLE_KUBECTL_ERRORS:
if error_substring in stderr:
if error_substring.lower() in stderr_lower:
# Raise timeout error regardless of raise_on_failure - as the intended
# semantics is to ignore expected errors caused by invoking the
# command not errors from PKB infrastructure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _DeleteAllFromDefaultNamespace():
run_cmd = ['delete', 'job', '--all', '-n', 'default']
kubectl.RunRetryableKubectlCommand(run_cmd)

timeout = 60 * 20
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
run_cmd = [
'delete',
'all',
Expand Down
Loading