From 73dc089b17928c85da7919e17fdc093c87b08747 Mon Sep 17 00:00:00 2001 From: iltyty Date: Tue, 10 Dec 2024 16:12:02 +0800 Subject: [PATCH] feat: automatic CNFS switching when mounting NAS volumes --- pkg/cnfs/v1beta1/types.go | 24 +++++ pkg/nas/nodeserver.go | 74 ++++++++++++-- pkg/nas/nodeserver_test.go | 202 +++++++++++++++++++++++++++++++++++++ pkg/utils/util.go | 6 +- 4 files changed, 295 insertions(+), 11 deletions(-) create mode 100644 pkg/nas/nodeserver_test.go diff --git a/pkg/cnfs/v1beta1/types.go b/pkg/cnfs/v1beta1/types.go index ac6fd788d..06226c7a5 100644 --- a/pkg/cnfs/v1beta1/types.go +++ b/pkg/cnfs/v1beta1/types.go @@ -28,6 +28,16 @@ type ContainerNetworkFileSystemStatus struct { Conditions []ContainerNetworkFileSystemCondition `json:"conditions,omitempty"` } +const ( + StatusPending = "Pending" + StatusCreating = "Creating" + StatusInit = "Initialization" + StatusAvailable = "Available" + StatusUnavailable = "Unavailable" + StatusTerminating = "Terminating" + StatusFatal = "Fatal" +) + // ContainerNetworkFileSystemCondition define cnfs condition field type ContainerNetworkFileSystemCondition struct { LastProbeTime string `json:"lastProbeTime,omitempty"` @@ -37,12 +47,26 @@ type ContainerNetworkFileSystemCondition struct { // ContainerNetworkFileSystemSpec define cnfs spec field type ContainerNetworkFileSystemSpec struct { + Fallback Fallback `json:"fallback,omitempty"` StorageType string `json:"type,omitempty"` ReclaimPolicy string `json:"reclaimPolicy,omitempty"` Description string `json:"description,omitempty"` Parameters Parameters `json:"parameters,omitempty"` } +type Fallback struct { + Name string `json:"name,omitempty"` + Strategy FallbackStrategy `json:"strategy,omitempty"` +} + +type FallbackStrategy string + +const ( + FallbackStrategyAlways FallbackStrategy = "Always" + FallbackStrategyIfConnectFailed FallbackStrategy = "IfConnectFailed" + FallbackStrategyIfMountTargetUnhealthy FallbackStrategy = "IfMountTargetUnhealthy" +) + // FsAttributes define cnfs status FsAttributes field type FsAttributes struct { RegionID string `json:"regionId,omitempty"` diff --git a/pkg/nas/nodeserver.go b/pkg/nas/nodeserver.go index 1f83b5608..8fc09eb6f 100644 --- a/pkg/nas/nodeserver.go +++ b/pkg/nas/nodeserver.go @@ -39,14 +39,17 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/rund/directvolume" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" mountutils "k8s.io/mount-utils" ) type nodeServer struct { - config *internal.NodeConfig - mounter mountutils.Interface - locks *utils.VolumeLocks + config *internal.NodeConfig + mounter mountutils.Interface + locks *utils.VolumeLocks + recorder record.EventRecorder common.GenericNodeServer } @@ -59,9 +62,10 @@ func newNodeServer(config *internal.NodeConfig) *nodeServer { klog.Errorf("failed to config /proc/sys/sunrpc/tcp_slot_table_entries: %v", err) } return &nodeServer{ - config: config, - mounter: NewNasMounter(), - locks: utils.NewVolumeLocks(), + config: config, + mounter: NewNasMounter(), + locks: utils.NewVolumeLocks(), + recorder: utils.NewEventRecorder(), GenericNodeServer: common.GenericNodeServer{ NodeID: config.NodeName, }, @@ -124,7 +128,10 @@ const ( //NFSClient NFSClient = "nfsclient" //NativeClient - NativeClient = "nativeclient" + NativeClient = "nativeclient" + cnfsAlwaysFallbackEventTmpl = "CNFS automatically switched from %s to %s." + cnfsIfConnectFailedFallbackEventTmpl = "Due to network issues, CNFS automatically switched from %s to %s." + cnfsIfMountTargetUnhealthyFallbackEventTmpl = "Due to mount target inactive, CNFS automatically switched from %s to %s." ) func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { @@ -225,7 +232,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if cnfsName != "" { - cnfs, err := ns.config.CNFSGetter.GetCNFS(ctx, cnfsName) + cnfs, err := ns.getCNFS(ctx, req, cnfsName) if err != nil { return nil, err } @@ -455,6 +462,57 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } +func (ns *nodeServer) getCNFS(ctx context.Context, req *csi.NodePublishVolumeRequest, name string) (*v1beta1.ContainerNetworkFileSystem, error) { + cnfs, err := ns.config.CNFSGetter.GetCNFS(ctx, name) + if err != nil { + return nil, err + } + if cnfs.Spec.Fallback.Name == "" || !cnfsNeedsFallback(cnfs) { + return cnfs, nil + } + return ns.fallbackCNFSAndRecord(ctx, req, cnfs) +} + +func cnfsNeedsFallback(cnfs *v1beta1.ContainerNetworkFileSystem) bool { + if cnfs == nil { + return false + } + switch cnfs.Spec.Fallback.Strategy { + case v1beta1.FallbackStrategyAlways: + return true + case v1beta1.FallbackStrategyIfConnectFailed: + server := cnfs.Spec.Parameters.Server + dialer := net.Dialer{Timeout: 5 * time.Second} + _, err := dialer.Dial("tcp", server+":2049") + return err != nil + case v1beta1.FallbackStrategyIfMountTargetUnhealthy: + return cnfs.Status.Status == v1beta1.StatusUnavailable + } + return false +} + +func (ns *nodeServer) fallbackCNFSAndRecord(ctx context.Context, req *csi.NodePublishVolumeRequest, cnfs *v1beta1.ContainerNetworkFileSystem) (*v1beta1.ContainerNetworkFileSystem, error) { + oldName, newName := cnfs.Name, cnfs.Spec.Fallback.Name + pod, err := utils.GetPodFromContextOrK8s(ctx, ns.config.KubeClient, req) + if err != nil { + return nil, err + } + fallbackCNFS, err := ns.config.CNFSGetter.GetCNFS(ctx, newName) + if err != nil { + return nil, err + } + + switch cnfs.Spec.Fallback.Strategy { + case v1beta1.FallbackStrategyAlways: + ns.recorder.Eventf(pod, v1.EventTypeWarning, "CNFSFallback", cnfsAlwaysFallbackEventTmpl, oldName, newName) + case v1beta1.FallbackStrategyIfConnectFailed: + ns.recorder.Eventf(pod, v1.EventTypeWarning, "CNFSFallback", cnfsIfConnectFailedFallbackEventTmpl, oldName, newName) + case v1beta1.FallbackStrategyIfMountTargetUnhealthy: + ns.recorder.Eventf(pod, v1.EventTypeWarning, "CNFSFallback", cnfsIfMountTargetUnhealthyFallbackEventTmpl, oldName, newName) + } + return fallbackCNFS, err +} + // /var/lib/kubelet/pods/5e03c7f7-2946-4ee1-ad77-2efbc4fdb16c/volumes/kubernetes.io~csi/nas-f5308354-725a-4fd3-b613-0f5b384bd00e/mount func (ns *nodeServer) mountLosetupPv(mountPoint string, opt *Options, volumeID string) error { pathList := strings.Split(mountPoint, "/") diff --git a/pkg/nas/nodeserver_test.go b/pkg/nas/nodeserver_test.go new file mode 100644 index 000000000..931dbfdff --- /dev/null +++ b/pkg/nas/nodeserver_test.go @@ -0,0 +1,202 @@ +package nas + +import ( + "context" + "fmt" + "net" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cnfs/v1beta1" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/internal" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" +) + +type fakeCNFSGetter struct { + cnfsMap map[string]*v1beta1.ContainerNetworkFileSystem +} + +func newFakeCNFSGetter(cnfsList ...*v1beta1.ContainerNetworkFileSystem) *fakeCNFSGetter { + cnfsMap := make(map[string]*v1beta1.ContainerNetworkFileSystem) + for _, cnfs := range cnfsList { + if cnfs != nil { + cnfsMap[cnfs.Name] = cnfs + } + } + return &fakeCNFSGetter{cnfsMap} +} + +func (f *fakeCNFSGetter) GetCNFS(_ context.Context, name string) (*v1beta1.ContainerNetworkFileSystem, error) { + if cnfs, ok := f.cnfsMap[name]; ok { + return cnfs, nil + } + return nil, fmt.Errorf("CNFS %s not found", name) +} + +func startListeningFor(addr string) { + listener, err := net.Listen("tcp", addr) + if err != nil { + panic(err) + } + defer listener.Close() + + for { + conn, _ := listener.Accept() + if conn != nil { + conn.Close() + } + } +} + +func fakeCNFS(name, status, server, fallbackName string, fallbackStrategy v1beta1.FallbackStrategy) *v1beta1.ContainerNetworkFileSystem { + return &v1beta1.ContainerNetworkFileSystem{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: v1beta1.ContainerNetworkFileSystemStatus{ + Status: status, + }, + Spec: v1beta1.ContainerNetworkFileSystemSpec{ + Fallback: v1beta1.Fallback{ + Name: fallbackName, + Strategy: fallbackStrategy, + }, + Parameters: v1beta1.Parameters{ + Server: server, + }, + }, + } +} + +func TestCNFSNeedsFallback(t *testing.T) { + tests := []struct { + name string + cnfs *v1beta1.ContainerNetworkFileSystem + expected bool + }{ + { + name: "Nil CNFS", + cnfs: nil, + expected: false, + }, + { + name: "Always fallback strategy", + cnfs: fakeCNFS("", "", "", "", v1beta1.FallbackStrategyAlways), + expected: true, + }, + { + name: "IfConnectFailed fallback strategy-server reachable", + cnfs: fakeCNFS("", "", "localhost", "", v1beta1.FallbackStrategyIfConnectFailed), + expected: false, + }, + { + name: "IfMountTargetUnhealthy fallback strategy-status Available", + cnfs: fakeCNFS("", v1beta1.StatusAvailable, "", "", v1beta1.FallbackStrategyIfMountTargetUnhealthy), + expected: false, + }, + { + name: "IfMountTargetUnhealthy fallback strategy-status Unavailable", + cnfs: fakeCNFS("", v1beta1.StatusUnavailable, "", "", v1beta1.FallbackStrategyIfMountTargetUnhealthy), + expected: true, + }, + } + + go startListeningFor("localhost:2049") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := cnfsNeedsFallback(tt.cnfs) + assert.Equal(t, tt.expected, actual) + }) + } +} + +func TestFallbackCNFSAndRecord(t *testing.T) { + client := fake.NewSimpleClientset(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + }) + req := &csi.NodePublishVolumeRequest{ + VolumeContext: map[string]string{ + utils.PodNameKey: "pod1", + utils.PodNamespaceKey: "default", + }, + } + eventRecorder := record.NewFakeRecorder(5) + ctx, _ := utils.WithPodInfo(context.Background(), client, req) + + primaryCNFSName, fallbackCNFSName := "primary", "fallback" + tests := []struct { + name string + primaryCNFS *v1beta1.ContainerNetworkFileSystem + fallbackCNFS *v1beta1.ContainerNetworkFileSystem + expectFallback bool + expectErr bool + }{ + { + name: "Always fallback strategy", + primaryCNFS: fakeCNFS(primaryCNFSName, "", "", fallbackCNFSName, v1beta1.FallbackStrategyAlways), + fallbackCNFS: fakeCNFS(fallbackCNFSName, "", "", "", ""), + expectFallback: true, + }, + { + name: "IfConnectFailed fallback strategy", + primaryCNFS: fakeCNFS(primaryCNFSName, "", "", fallbackCNFSName, v1beta1.FallbackStrategyIfConnectFailed), + fallbackCNFS: fakeCNFS(fallbackCNFSName, "", "", "", ""), + expectFallback: true, + }, + { + name: "IfMountTargetUnhealthy fallback strategy", + primaryCNFS: fakeCNFS(primaryCNFSName, v1beta1.StatusUnavailable, "", fallbackCNFSName, v1beta1.FallbackStrategyIfMountTargetUnhealthy), + fallbackCNFS: fakeCNFS(fallbackCNFSName, "", "", "", ""), + expectFallback: true, + }, + { + name: "Non-existent fallback CNFS", + primaryCNFS: fakeCNFS("primary", "", "", "non-existent-cnfs", ""), + fallbackCNFS: fakeCNFS(fallbackCNFSName, "", "", "", ""), + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cnfsGetter := newFakeCNFSGetter(tt.fallbackCNFS) + server := nodeServer{ + config: &internal.NodeConfig{ + KubeClient: client, + CNFSGetter: cnfsGetter, + }, + recorder: eventRecorder, + } + actual, err := server.fallbackCNFSAndRecord(ctx, req, tt.primaryCNFS) + if tt.expectErr { + assert.Error(t, err) + } else { + if tt.expectFallback { + assert.Equal(t, *tt.fallbackCNFS, *actual) + assert.Len(t, eventRecorder.Events, 1) + + oldCNFSName, newCNFSName := tt.primaryCNFS.Name, tt.fallbackCNFS.Name + msg := <-eventRecorder.Events + switch tt.primaryCNFS.Spec.Fallback.Strategy { + case v1beta1.FallbackStrategyAlways: + assert.Contains(t, msg, fmt.Sprintf(cnfsAlwaysFallbackEventTmpl, oldCNFSName, newCNFSName)) + case v1beta1.FallbackStrategyIfConnectFailed: + assert.Contains(t, msg, fmt.Sprintf(cnfsIfConnectFailedFallbackEventTmpl, oldCNFSName, newCNFSName)) + case v1beta1.FallbackStrategyIfMountTargetUnhealthy: + assert.Contains(t, msg, fmt.Sprintf(cnfsIfMountTargetUnhealthyFallbackEventTmpl, oldCNFSName, newCNFSName)) + } + } else { + assert.Equal(t, *tt.primaryCNFS, *actual) + } + } + }) + } +} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 7d41e3fc4..eedd7510a 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -465,7 +465,7 @@ func GetPodRunTime(ctx context.Context, req *csi.NodePublishVolumeRequest, clien } func WithPodInfo(ctx context.Context, client kubernetes.Interface, req *csi.NodePublishVolumeRequest) (context.Context, *v1.Pod) { - pod, err := getPodFromK8s(ctx, client, req) + pod, err := getPodFromK8s(client, req) if err != nil { klog.Errorf("WithPodInfo: failed to get pod: %v", err) return ctx, nil @@ -478,10 +478,10 @@ func GetPodFromContextOrK8s(ctx context.Context, client kubernetes.Interface, re if ok { return pod, nil } - return getPodFromK8s(ctx, client, req) + return getPodFromK8s(client, req) } -func getPodFromK8s(ctx context.Context, client kubernetes.Interface, req *csi.NodePublishVolumeRequest) (*v1.Pod, error) { +func getPodFromK8s(client kubernetes.Interface, req *csi.NodePublishVolumeRequest) (*v1.Pod, error) { name, namespace := req.VolumeContext[PodNameKey], req.VolumeContext[PodNamespaceKey] if name == "" || namespace == "" { return nil, fmt.Errorf("empty pod name or namespace: '%s', '%s'", name, namespace)