@@ -22,6 +22,7 @@ import (
2222 "strings"
2323 "time"
2424
25+ corev1 "k8s.io/api/core/v1"
2526 k8serrors "k8s.io/apimachinery/pkg/api/errors"
2627 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728 "k8s.io/apimachinery/pkg/util/uuid"
@@ -37,6 +38,23 @@ import (
3738 "github.com/juicedata/juicefs-csi-driver/pkg/util/resource"
3839)
3940
41+ // DaemonSetSchedulingError indicates that a DaemonSet cannot schedule on a specific node
42+ type DaemonSetSchedulingError struct {
43+ DaemonSetName string
44+ NodeName string
45+ Message string
46+ }
47+
48+ func (e * DaemonSetSchedulingError ) Error () string {
49+ return e .Message
50+ }
51+
52+ // IsDaemonSetSchedulingError checks if the error is a DaemonSet scheduling error
53+ func IsDaemonSetSchedulingError (err error ) bool {
54+ _ , ok := err .(* DaemonSetSchedulingError )
55+ return ok
56+ }
57+
4058type DaemonSetMount struct {
4159 log klog.Logger
4260 k8sMount.SafeFormatAndMount
@@ -294,6 +312,25 @@ func (d *DaemonSetMount) createOrUpdateDaemonSet(ctx context.Context, dsName str
294312func (d * DaemonSetMount ) waitUntilDaemonSetReady (ctx context.Context , dsName string , jfsSetting * jfsConfig.JfsSetting ) error {
295313 log := util .GenLog (ctx , d .log , "waitUntilDaemonSetReady" )
296314
315+ // First, check if the DaemonSet can schedule a pod on this node
316+ canSchedule , err := d .canScheduleOnNode (ctx , dsName )
317+ if err != nil {
318+ log .Error (err , "Failed to check if DaemonSet can schedule on node" )
319+ // Continue anyway, might be a transient error
320+ }
321+
322+ if ! canSchedule {
323+ // DaemonSet cannot schedule on this node due to nodeAffinity
324+ // Return a specific error that can be handled by the caller
325+ log .Info ("DaemonSet cannot schedule on this node due to nodeAffinity, need fallback" ,
326+ "dsName" , dsName , "nodeName" , jfsConfig .NodeName )
327+ return & DaemonSetSchedulingError {
328+ DaemonSetName : dsName ,
329+ NodeName : jfsConfig .NodeName ,
330+ Message : "DaemonSet cannot schedule on this node due to nodeAffinity restrictions" ,
331+ }
332+ }
333+
297334 // Wait for DaemonSet to have pods ready on current node
298335 timeout := 5 * time .Minute
299336 waitCtx , cancel := context .WithTimeout (ctx , timeout )
@@ -302,7 +339,12 @@ func (d *DaemonSetMount) waitUntilDaemonSetReady(ctx context.Context, dsName str
302339 for {
303340 select {
304341 case <- waitCtx .Done ():
305- return fmt .Errorf ("timeout waiting for DaemonSet %s to be ready on node %s" , dsName , jfsConfig .NodeName )
342+ // Timeout - could be because pod cannot be scheduled on this node
343+ return & DaemonSetSchedulingError {
344+ DaemonSetName : dsName ,
345+ NodeName : jfsConfig .NodeName ,
346+ Message : fmt .Sprintf ("timeout waiting for DaemonSet pod to be ready on node %s" , jfsConfig .NodeName ),
347+ }
306348 default :
307349 ds , err := d .K8sClient .GetDaemonSet (waitCtx , dsName , jfsConfig .Namespace )
308350 if err != nil {
@@ -416,4 +458,145 @@ func (d *DaemonSetMount) getDaemonSetNameFromTarget(ctx context.Context, target
416458 }
417459
418460 return ""
461+ }
462+
463+ // canScheduleOnNode checks if a DaemonSet can schedule a pod on the current node
464+ func (d * DaemonSetMount ) canScheduleOnNode (ctx context.Context , dsName string ) (bool , error ) {
465+ log := util .GenLog (ctx , d .log , "canScheduleOnNode" )
466+
467+ // Get the DaemonSet
468+ ds , err := d .K8sClient .GetDaemonSet (ctx , dsName , jfsConfig .Namespace )
469+ if err != nil {
470+ return false , err
471+ }
472+
473+ // Get the current node
474+ node , err := d .K8sClient .GetNode (ctx , jfsConfig .NodeName )
475+ if err != nil {
476+ log .Error (err , "Failed to get node" , "nodeName" , jfsConfig .NodeName )
477+ return false , err
478+ }
479+
480+ // Check if the node matches the DaemonSet's nodeAffinity
481+ if ds .Spec .Template .Spec .Affinity != nil && ds .Spec .Template .Spec .Affinity .NodeAffinity != nil {
482+ nodeAffinity := ds .Spec .Template .Spec .Affinity .NodeAffinity
483+
484+ // Check required node affinity
485+ if nodeAffinity .RequiredDuringSchedulingIgnoredDuringExecution != nil {
486+ matches := false
487+ for _ , term := range nodeAffinity .RequiredDuringSchedulingIgnoredDuringExecution .NodeSelectorTerms {
488+ if nodeMatchesSelectorTerm (node , & term ) {
489+ matches = true
490+ break
491+ }
492+ }
493+ if ! matches {
494+ log .Info ("Node does not match DaemonSet's required node affinity" ,
495+ "nodeName" , jfsConfig .NodeName , "dsName" , dsName )
496+ return false , nil
497+ }
498+ }
499+ }
500+
501+ // Check if the node has any taints that would prevent scheduling
502+ // (This is a simplified check - a full implementation would need to check tolerations)
503+ if len (node .Spec .Taints ) > 0 && len (ds .Spec .Template .Spec .Tolerations ) == 0 {
504+ for _ , taint := range node .Spec .Taints {
505+ if taint .Effect == corev1 .TaintEffectNoSchedule || taint .Effect == corev1 .TaintEffectNoExecute {
506+ log .Info ("Node has taints that prevent scheduling" ,
507+ "nodeName" , jfsConfig .NodeName , "taint" , taint )
508+ return false , nil
509+ }
510+ }
511+ }
512+
513+ return true , nil
514+ }
515+
516+ // nodeMatchesSelectorTerm checks if a node matches a node selector term
517+ func nodeMatchesSelectorTerm (node * corev1.Node , term * corev1.NodeSelectorTerm ) bool {
518+ // Check match expressions
519+ for _ , expr := range term .MatchExpressions {
520+ if ! nodeMatchesExpression (node , & expr ) {
521+ return false
522+ }
523+ }
524+
525+ // Check match fields
526+ for _ , field := range term .MatchFields {
527+ if ! nodeMatchesFieldSelector (node , & field ) {
528+ return false
529+ }
530+ }
531+
532+ return true
533+ }
534+
535+ // nodeMatchesExpression checks if a node matches a label selector requirement
536+ func nodeMatchesExpression (node * corev1.Node , expr * corev1.NodeSelectorRequirement ) bool {
537+ value , exists := node .Labels [expr .Key ]
538+
539+ switch expr .Operator {
540+ case corev1 .NodeSelectorOpIn :
541+ if ! exists {
542+ return false
543+ }
544+ for _ , v := range expr .Values {
545+ if value == v {
546+ return true
547+ }
548+ }
549+ return false
550+ case corev1 .NodeSelectorOpNotIn :
551+ if ! exists {
552+ return true
553+ }
554+ for _ , v := range expr .Values {
555+ if value == v {
556+ return false
557+ }
558+ }
559+ return true
560+ case corev1 .NodeSelectorOpExists :
561+ return exists
562+ case corev1 .NodeSelectorOpDoesNotExist :
563+ return ! exists
564+ case corev1 .NodeSelectorOpGt , corev1 .NodeSelectorOpLt :
565+ // These operators are typically used for numeric comparisons
566+ // For simplicity, we're not implementing them here
567+ return true
568+ default :
569+ return false
570+ }
571+ }
572+
573+ // nodeMatchesFieldSelector checks if a node matches a field selector
574+ func nodeMatchesFieldSelector (node * corev1.Node , field * corev1.NodeSelectorRequirement ) bool {
575+ var value string
576+ switch field .Key {
577+ case "metadata.name" :
578+ value = node .Name
579+ // Add more field selectors as needed
580+ default :
581+ return false
582+ }
583+
584+ switch field .Operator {
585+ case corev1 .NodeSelectorOpIn :
586+ for _ , v := range field .Values {
587+ if value == v {
588+ return true
589+ }
590+ }
591+ return false
592+ case corev1 .NodeSelectorOpNotIn :
593+ for _ , v := range field .Values {
594+ if value == v {
595+ return false
596+ }
597+ }
598+ return true
599+ default :
600+ return false
601+ }
419602}
0 commit comments