Skip to content

Commit

Permalink
Support any number of node names in affinity.
Browse files Browse the repository at this point in the history
When we process our PVs, We shouldn't care about the amount of nodes.
If none of them exists we can just clear it out and if any one of them
does exist, we should skip it.
  • Loading branch information
Omar007 committed Sep 16, 2024
1 parent 12ed490 commit 105673e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 120 deletions.
46 changes: 13 additions & 33 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/mount"
)

Expand Down Expand Up @@ -495,44 +494,25 @@ func GetVolumeMode(volUtil util.VolumeUtil, fullPath string) (v1.PersistentVolum
return "", fmt.Errorf("Block device check for %q failed: %s", fullPath, errblk)
}

// NodeExists checks to see if a Node exists in the Indexer of a NodeLister.
// It tries to get the node and if it fails, it uses the well known label
// `kubernetes.io/hostname` to find the Node.
func NodeExists(nodeLister corelisters.NodeLister, nodeName string) (bool, error) {
_, err := nodeLister.Get(nodeName)
if errors.IsNotFound(err) {
// AnyNodeExists checks to see if a Node exists in the Indexer of a NodeLister.
// If this fails, it uses the well known label `kubernetes.io/hostname` to find the Node.
// It aborts early if an unexpected error occurs and it's uncertain if a node would exist or not.
func AnyNodeExists(nodeLister corelisters.NodeLister, nodeNames []string) bool {
for _, nodeName := range nodeNames {
_, err := nodeLister.Get(nodeName)
if err == nil || !errors.IsNotFound(err) {
return true
}
req, err := labels.NewRequirement(NodeLabelKey, selection.Equals, []string{nodeName})
if err != nil {
return false, err
return true
}
nodes, err := nodeLister.List(labels.NewSelector().Add(*req))
if err != nil {
return false, err
if err != nil || len(nodes) > 0 {
return true
}
return len(nodes) > 0, nil
}
return err == nil, err
}

// NodeAttachedToLocalPV gets the name of the Node that a local PV has a NodeAffinity to.
// It assumes that there should be only one matching Node for a local PV and that
// the local PV follows the form:
//
// nodeAffinity:
// required:
// nodeSelectorTerms:
// - matchExpressions:
// - key: kubernetes.io/hostname
// operator: In
// values:
// - <node1>
func NodeAttachedToLocalPV(pv *v1.PersistentVolume) (string, bool) {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
// We assume that there should only be one matching node.
if nodeNames == nil || len(nodeNames) != 1 {
return "", false
}
return nodeNames[0], true
return false
}

// IsLocalPVWithStorageClass checks that a PV is a local PV that belongs to any of the passed in StorageClasses.
Expand Down
79 changes: 24 additions & 55 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestGetVolumeMode(t *testing.T) {
}
}

func TestNodeExists(t *testing.T) {
func TestAnyNodeExists(t *testing.T) {
nodeName := "test-node"
nodeWithName := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -495,21 +495,39 @@ func TestNodeExists(t *testing.T) {
tests := []struct {
nodeAdded *v1.Node
// Required.
nodeQueried *v1.Node
nodeQueried []string
expectedResult bool
}{
{
nodeAdded: nodeWithName,
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: true,
},
{
nodeAdded: nodeWithLabel,
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: true,
},
{
nodeQueried: nodeWithName,
nodeQueried: []string{nodeName},
expectedResult: false,
},
{
nodeAdded: nodeWithName,
nodeQueried: []string{"other-node", nodeName},
expectedResult: true,
},
{
nodeAdded: nodeWithLabel,
nodeQueried: []string{"other-node", nodeName},
expectedResult: true,
},
{
nodeQueried: []string{},
expectedResult: false,
},
{
nodeQueried: nil,
expectedResult: false,
},
}
Expand All @@ -523,62 +541,13 @@ func TestNodeExists(t *testing.T) {
nodeInformer.Informer().GetStore().Add(test.nodeAdded)
}

exists, err := NodeExists(nodeInformer.Lister(), test.nodeQueried.Name)
if err != nil {
t.Errorf("Got unexpected error: %s", err.Error())
}
exists := AnyNodeExists(nodeInformer.Lister(), test.nodeQueried)
if exists != test.expectedResult {
t.Errorf("expected result: %t, actual: %t", test.expectedResult, exists)
}
}
}

func TestNodeAttachedToLocalPV(t *testing.T) {
nodeName := "testNodeName"

tests := []struct {
name string
pv *v1.PersistentVolume
expectedNodeName string
expectedStatus bool
}{
{
name: "NodeAffinity will all necessary fields",
pv: withNodeAffinity(pv(), []string{nodeName}, NodeLabelKey),
expectedNodeName: nodeName,
expectedStatus: true,
},
{
name: "empty nodeNames array",
pv: withNodeAffinity(pv(), []string{}, NodeLabelKey),
expectedNodeName: "",
expectedStatus: false,
},
{
name: "multiple nodeNames",
pv: withNodeAffinity(pv(), []string{nodeName, "newNode"}, NodeLabelKey),
expectedNodeName: "",
expectedStatus: false,
},
{
name: "wrong node label key",
pv: withNodeAffinity(pv(), []string{nodeName}, "wrongLabel"),
expectedNodeName: "",
expectedStatus: false,
},
}

for _, test := range tests {
nodeName, ok := NodeAttachedToLocalPV(test.pv)
if ok != test.expectedStatus {
t.Errorf("test: %s, status: %t, expectedStaus: %t", test.name, ok, test.expectedStatus)
}
if nodeName != test.expectedNodeName {
t.Errorf("test: %s, nodeName: %s, expectedNodeName: %s", test.name, nodeName, test.expectedNodeName)
}
}
}

func TestIsLocalPVWithStorageClass(t *testing.T) {
tests := []struct {
name string
Expand Down
30 changes: 11 additions & 19 deletions pkg/node-cleanup/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup"
Expand Down Expand Up @@ -196,18 +197,15 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro
return err
}

nodeName, ok := common.NodeAttachedToLocalPV(pv)
if !ok {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
if nodeNames == nil {
// For whatever reason the PV isn't formatted properly so we will
// never be able to get its corresponding Node, so ignore.
klog.Errorf("error getting node attached to pv: %s", pv)
return nil
}

nodeExists, err := common.NodeExists(c.nodeLister, nodeName)
if err != nil {
return err
}
nodeExists := common.AnyNodeExists(c.nodeLister, nodeNames)
// Check that the node the PV/PVC reference is still deleted
if nodeExists {
return nil
Expand Down Expand Up @@ -242,7 +240,7 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro
}

cleanupmetrics.PersistentVolumeClaimDeleteTotal.Inc()
klog.Infof("Deleted PVC %q that pointed to Node %q", pvClaimRef.Name, nodeName)
klog.Infof("Deleted PVC %q that pointed to non-existent Nodes %q", pvClaimRef.Name, nodeNames)
return nil
}

Expand All @@ -264,18 +262,13 @@ func (c *CleanupController) startCleanupTimersIfNeeded() {
continue
}

nodeName, ok := common.NodeAttachedToLocalPV(pv)
if !ok {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv)
if nodeNames == nil {
klog.Errorf("error getting node attached to pv: %s", pv)
continue
}

shouldEnqueue, err := c.shouldEnqueueEntry(pv, nodeName)
if err != nil {
klog.Errorf("error determining whether to enqueue entry with pv %q: %v", pv.Name, err)
continue
}

shouldEnqueue := c.shouldEnqueueEntry(pv, nodeNames)
if shouldEnqueue {
klog.Infof("Starting timer for resource deletion, resource:%s, timer duration: %s", pv.Spec.ClaimRef, c.pvcDeletionDelay.String())
c.eventRecorder.Event(pv.Spec.ClaimRef, v1.EventTypeWarning, "ReferencedNodeDeleted", fmt.Sprintf("PVC is tied to a deleted Node. PVC will be cleaned up in %s if the Node doesn't come back", c.pvcDeletionDelay.String()))
Expand All @@ -288,13 +281,12 @@ func (c *CleanupController) startCleanupTimersIfNeeded() {
// shouldEnqueuePV checks if a PV should be enqueued to the entryQueue.
// The PV must be a local PV, have a StorageClass present in the list of storageClassNames, have a NodeAffinity
// to a deleted Node, and have a PVC bound to it (otherwise there's nothing to clean up).
func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeName string) (bool, error) {
func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeNames []string) bool {
if pv.Spec.ClaimRef == nil {
return false, nil
return false
}

exists, err := common.NodeExists(c.nodeLister, nodeName)
return !exists && err == nil, err
return !common.AnyNodeExists(c.nodeLister, nodeNames)
}

// deletePVC deletes the PVC with the given name and namespace
Expand Down
20 changes: 7 additions & 13 deletions pkg/node-cleanup/deleter/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package deleter

import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
Expand All @@ -28,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
volumeUtil "k8s.io/kubernetes/pkg/volume/util"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup"
Expand Down Expand Up @@ -82,12 +82,7 @@ func (d *Deleter) DeletePVs(ctx context.Context) {
continue
}

referencesDeletedNode, err := d.referencesNonExistentNode(pv)
if err != nil {
klog.Errorf("error determining if pv %q references deleted node: %v", pv.Name, err)
continue
}
if !referencesDeletedNode {
if !d.referencesNonExistentNode(pv) {
// PV's node is up so PV is not stale
continue
}
Expand Down Expand Up @@ -124,14 +119,13 @@ func (d *Deleter) DeletePVs(ctx context.Context) {
// operator: In
// values:
// - <node1>
func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) (bool, error) {
nodeName, ok := common.NodeAttachedToLocalPV(localPV)
if !ok {
return false, fmt.Errorf("Error retrieving node")
func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) bool {
nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(localPV)
if nodeNames == nil {
return false
}

exists, err := common.NodeExists(d.nodeLister, nodeName)
return !exists && err == nil, err
return !common.AnyNodeExists(d.nodeLister, nodeNames)
}

func (d *Deleter) deletePV(ctx context.Context, pvName string) error {
Expand Down

0 comments on commit 105673e

Please sign in to comment.