Skip to content

Commit b7498a1

Browse files
committed
feature: add extender plugins
Signed-off-by: googs1025 <[email protected]>
1 parent 419fe74 commit b7498a1

File tree

8 files changed

+609
-0
lines changed

8 files changed

+609
-0
lines changed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package extender
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"crypto/x509"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
v1 "k8s.io/api/core/v1"
12+
"k8s.io/klog/v2"
13+
"net/http"
14+
"os"
15+
)
16+
17+
// EvictionRequest represents the request body sent to the external extender.
18+
type EvictionRequest struct {
19+
Pod *v1.Pod `json:"pod"`
20+
Node *v1.Node `json:"node"`
21+
}
22+
23+
// EvictionResponse represents the response from the external extender.
24+
type EvictionResponse struct {
25+
Allow bool `json:"allow"`
26+
Reason string `json:"reason,omitempty"`
27+
Error string `json:"error,omitempty"`
28+
}
29+
30+
// ExtenderClient
31+
type ExtenderClient struct {
32+
baseURL string // urlPrefix + decisionVerb
33+
httpClient *http.Client
34+
ignorable bool
35+
failPolicy string
36+
logger klog.Logger
37+
}
38+
39+
// NewExtenderClient creates a new ExtenderClient from ExternalExtender config.
40+
func NewExtenderClient(extenderConfig *ExternalExtender, logger klog.Logger) (*ExtenderClient, error) {
41+
if extenderConfig == nil {
42+
return nil, fmt.Errorf("extender config is nil")
43+
}
44+
if extenderConfig.URLPrefix == "" {
45+
return nil, fmt.Errorf("urlPrefix is required")
46+
}
47+
if extenderConfig.DecisionVerb == "" {
48+
return nil, fmt.Errorf("decisionVerb is required")
49+
}
50+
if extenderConfig.HTTPTimeout.Duration <= 0 {
51+
return nil, fmt.Errorf("httpTimeout must be > 0")
52+
}
53+
54+
baseURL := fmt.Sprintf("%s/%s", trimSuffix(extenderConfig.URLPrefix, "/"), extenderConfig.DecisionVerb)
55+
56+
var tlsConfig *tls.Config
57+
if extenderConfig.EnableHTTPS {
58+
var err error
59+
tlsConfig, err = buildTLSConfig(extenderConfig.TLSConfig)
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to build TLS config: %v", err)
62+
}
63+
}
64+
65+
httpClient := &http.Client{
66+
Timeout: extenderConfig.HTTPTimeout.Duration,
67+
Transport: &http.Transport{
68+
TLSClientConfig: tlsConfig,
69+
},
70+
}
71+
72+
return &ExtenderClient{
73+
baseURL: baseURL,
74+
httpClient: httpClient,
75+
ignorable: extenderConfig.Ignorable,
76+
failPolicy: defaultIfEmpty(extenderConfig.FailPolicy, "Deny"),
77+
logger: logger.WithValues("extender", baseURL),
78+
}, nil
79+
}
80+
81+
// DecideEviction calls the external service to decide whether to evict the pod.
82+
func (c *ExtenderClient) DecideEviction(ctx context.Context, pod *v1.Pod, node *v1.Node) (bool, string, error) {
83+
reqData := EvictionRequest{Pod: pod, Node: node}
84+
85+
body, err := json.Marshal(reqData)
86+
if err != nil {
87+
return false, "", fmt.Errorf("failed to marshal request: %v", err)
88+
}
89+
90+
httpRequest, err := http.NewRequestWithContext(ctx, "POST", c.baseURL, bytes.NewReader(body))
91+
if err != nil {
92+
return false, "", fmt.Errorf("failed to create request: %v", err)
93+
}
94+
httpRequest.Header.Set("Content-Type", "application/json")
95+
96+
c.logger.V(4).Info("Calling external decision service", "url", c.baseURL, "pod", klog.KObj(pod))
97+
98+
resp, err := c.httpClient.Do(httpRequest)
99+
if err != nil {
100+
errMsg := fmt.Sprintf("HTTP request failed: %v", err)
101+
c.logger.Error(err, "External decision service unreachable")
102+
if c.ignorable {
103+
c.logger.V(2).Info("ignorable=true, allowing eviction by default")
104+
return true, "extender unreachable (ignorable)", nil
105+
}
106+
return false, "", fmt.Errorf("%s", errMsg)
107+
}
108+
defer resp.Body.Close()
109+
110+
respBody, err := io.ReadAll(resp.Body)
111+
if err != nil {
112+
return false, "", fmt.Errorf("failed to read response body: %v", err)
113+
}
114+
115+
if resp.StatusCode != http.StatusOK {
116+
errMsg := fmt.Sprintf("non-200 status code: %d, body: %s", resp.StatusCode, string(respBody))
117+
c.logger.Error(nil, "External decision service returned error", "statusCode", resp.StatusCode, "body", string(respBody))
118+
if c.ignorable {
119+
c.logger.V(2).Info("ignorable=true, allowing eviction by default")
120+
return true, "extender error (ignorable)", nil
121+
}
122+
return false, "", fmt.Errorf("%s", errMsg)
123+
}
124+
125+
var extResp EvictionResponse
126+
if err := json.Unmarshal(respBody, &extResp); err != nil {
127+
return false, "", fmt.Errorf("failed to unmarshal response: %v, body: %s", err, string(respBody))
128+
}
129+
130+
if extResp.Error != "" {
131+
c.logger.V(2).Info("External decision service returned logical error", "error", extResp.Error)
132+
if c.ignorable {
133+
return true, "extender logical error (ignorable)", nil
134+
}
135+
return false, extResp.Error, nil
136+
}
137+
138+
c.logger.V(3).Info("External decision received", "allow", extResp.Allow, "reason", extResp.Reason)
139+
140+
return extResp.Allow, extResp.Reason, nil
141+
}
142+
143+
func trimSuffix(s, suffix string) string {
144+
if suffix == "" {
145+
return s
146+
}
147+
if s == suffix {
148+
return s
149+
}
150+
if len(s) > len(suffix) && s[len(s)-len(suffix):] == suffix {
151+
return s[:len(s)-len(suffix)]
152+
}
153+
return s
154+
}
155+
156+
func defaultIfEmpty(s, def string) string {
157+
if s == "" {
158+
return def
159+
}
160+
return s
161+
}
162+
163+
func buildTLSConfig(tlsConfig *ExtenderTLSConfig) (*tls.Config, error) {
164+
if tlsConfig == nil {
165+
return &tls.Config{InsecureSkipVerify: false}, nil
166+
}
167+
168+
config := &tls.Config{InsecureSkipVerify: false}
169+
170+
if tlsConfig.CACertFile != "" {
171+
caCert, err := os.ReadFile(tlsConfig.CACertFile)
172+
if err != nil {
173+
return nil, fmt.Errorf("failed to read CA cert file: %v", err)
174+
}
175+
caCertPool := x509.NewCertPool()
176+
if !caCertPool.AppendCertsFromPEM(caCert) {
177+
return nil, fmt.Errorf("failed to parse CA certificate")
178+
}
179+
config.RootCAs = caCertPool
180+
}
181+
182+
return config, nil
183+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package extender
15+
16+
import (
17+
"k8s.io/apimachinery/pkg/runtime"
18+
)
19+
20+
func addDefaultingFuncs(scheme *runtime.Scheme) error {
21+
return RegisterDefaults(scheme)
22+
}
23+
24+
// SetDefaults_ExternalDecisionArgs
25+
// TODO: the final default values would be discussed in community
26+
func SetDefaults_ExternalDecisionArgs(obj runtime.Object) {
27+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// +k8s:defaulter-gen=TypeMeta
15+
16+
package extender
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package extender
2+
3+
import (
4+
"context"
5+
"fmt"
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/klog/v2"
9+
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
10+
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
11+
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
12+
)
13+
14+
const PluginName = "ExternalDecision"
15+
16+
// ExternalDecision
17+
type ExternalDecision struct {
18+
logger klog.Logger
19+
handle frameworktypes.Handle
20+
args *ExternalDecisionArgs
21+
podFilter podutil.FilterFunc
22+
client *ExtenderClient
23+
}
24+
25+
func (d *ExternalDecision) Name() string {
26+
return PluginName
27+
}
28+
29+
var _ frameworktypes.DeschedulePlugin = &ExternalDecision{}
30+
31+
// New builds plugin from its arguments while passing a handle
32+
func New(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
33+
extArgs, ok := args.(*ExternalDecisionArgs)
34+
if !ok {
35+
return nil, fmt.Errorf("want args to be of type RemoveFailedPodsArgs, got %T", args)
36+
}
37+
38+
if extArgs.Extender == nil {
39+
return nil, nil
40+
}
41+
if extArgs.Extender.URLPrefix == "" {
42+
return nil, nil
43+
}
44+
if extArgs.Extender.HTTPTimeout.Duration <= 0 {
45+
return nil, nil
46+
}
47+
48+
if extArgs.Extender.FailPolicy == "" {
49+
extArgs.Extender.FailPolicy = "Deny"
50+
}
51+
52+
if extArgs.Extender == nil {
53+
return nil, nil
54+
}
55+
56+
client, err := NewExtenderClient(extArgs.Extender, klog.FromContext(ctx))
57+
if err != nil {
58+
return nil, nil
59+
}
60+
61+
logger := klog.FromContext(ctx).WithValues("plugin", PluginName)
62+
63+
podFilter, err := podutil.NewOptions().
64+
WithFilter(podutil.WrapFilterFuncs(handle.Evictor().Filter, handle.Evictor().PreEvictionFilter)).
65+
BuildFilterFunc()
66+
if err != nil {
67+
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
68+
}
69+
70+
return &ExternalDecision{
71+
logger: logger,
72+
handle: handle,
73+
podFilter: podFilter,
74+
client: client,
75+
}, nil
76+
}
77+
78+
// Deschedule extension point implementation for the ExternalDecision plugin
79+
func (d *ExternalDecision) Deschedule(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
80+
podsToEvict := make([]*v1.Pod, 0)
81+
nodeMap := make(map[string]*v1.Node, len(nodes))
82+
logger := klog.FromContext(klog.NewContext(ctx, d.logger)).WithValues("ExtensionPoint", frameworktypes.DescheduleExtensionPoint)
83+
84+
// Build node map and collect candidate pods
85+
for _, node := range nodes {
86+
logger.V(2).Info("Processing node", "node", klog.KObj(node))
87+
pods, err := podutil.ListAllPodsOnANode(node.Name, d.handle.GetPodsAssignedToNodeFunc(), d.podFilter)
88+
if err != nil {
89+
return &frameworktypes.Status{
90+
Err: fmt.Errorf("error listing pods on node %s: %v", node.Name, err),
91+
}
92+
}
93+
94+
nodeMap[node.Name] = node
95+
podsToEvict = append(podsToEvict, pods...)
96+
}
97+
98+
// Evaluate each pod
99+
for i := range podsToEvict {
100+
pod := podsToEvict[i]
101+
node, exists := nodeMap[pod.Spec.NodeName]
102+
if !exists {
103+
logger.V(4).Info("Node not found in map, skipping pod", "pod", klog.KObj(pod), "node", pod.Spec.NodeName)
104+
continue
105+
}
106+
107+
shouldEvict, reason, err := d.evaluatePod(ctx, pod, node)
108+
if err != nil {
109+
d.handleError(logger, pod, err)
110+
continue
111+
}
112+
113+
if !shouldEvict {
114+
logger.V(3).Info("Eviction denied by external decision", "pod", klog.KObj(pod), "reason", reason)
115+
continue
116+
}
117+
118+
loop:
119+
for _, pod := range podsToEvict {
120+
err := d.handle.Evictor().Evict(ctx, pod, evictions.EvictOptions{StrategyName: PluginName})
121+
if err == nil {
122+
continue
123+
}
124+
switch err.(type) {
125+
case *evictions.EvictionNodeLimitError:
126+
continue loop
127+
case *evictions.EvictionTotalLimitError:
128+
return nil
129+
default:
130+
logger.Error(err, "eviction failed")
131+
}
132+
}
133+
134+
}
135+
136+
return nil
137+
}
138+
139+
// evaluatePod calls the external service to decide if a pod can be evicted.
140+
func (d *ExternalDecision) evaluatePod(ctx context.Context, pod *v1.Pod, node *v1.Node) (bool, string, error) {
141+
allow, reason, err := d.client.DecideEviction(ctx, pod, node)
142+
if err != nil {
143+
return false, "", fmt.Errorf("extender call failed: %v", err)
144+
}
145+
return allow, reason, nil
146+
}
147+
148+
// handleError logs and handles errors from external decision service based on FailPolicy.
149+
func (d *ExternalDecision) handleError(logger klog.Logger, pod *v1.Pod, err error) {
150+
logger.Error(err, "External decision service error", "pod", klog.KObj(pod))
151+
152+
switch d.args.Extender.FailPolicy {
153+
case "Allow":
154+
logger.V(2).Info("failPolicy=Allow, proceeding with eviction", "pod", klog.KObj(pod))
155+
case "Ignore":
156+
logger.V(2).Info("failPolicy=Ignore, skipping pod", "pod", klog.KObj(pod))
157+
case "Deny", "":
158+
logger.V(2).Info("failPolicy=Deny, blocking eviction", "pod", klog.KObj(pod))
159+
default:
160+
logger.Error(nil, "Unknown failPolicy, defaulting to Deny", "failPolicy", d.args.Extender.FailPolicy, "pod", klog.KObj(pod))
161+
}
162+
}

0 commit comments

Comments
 (0)