diff --git a/plugin/pkg/scheduler/algorithm/predicates/BUILD b/plugin/pkg/scheduler/algorithm/predicates/BUILD index 969b312ba33..8f586f64b50 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/BUILD +++ b/plugin/pkg/scheduler/algorithm/predicates/BUILD @@ -22,6 +22,7 @@ go_library( "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/client/listers/core/v1:go_default_library", + "//pkg/features:go_default_library", "//pkg/volume/util:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library", @@ -31,6 +32,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library", ], diff --git a/plugin/pkg/scheduler/algorithm/predicates/error.go b/plugin/pkg/scheduler/algorithm/predicates/error.go index 36ac5f1deac..ecedab29cba 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/error.go +++ b/plugin/pkg/scheduler/algorithm/predicates/error.go @@ -37,6 +37,7 @@ var ( ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount") ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure") ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure") + ErrVolumeNodeConflict = newPredicateFailureError("NoVolumeNodeConflict") // ErrFakePredicate is used for test only. The fake predicates returning false also returns error // as ErrFakePredicate. ErrFakePredicate = newPredicateFailureError("FakePredicateError") diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 6a86bebaf6a..10ea3eafea3 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -29,14 +29,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/features" + volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) // predicatePrecomputations: Helper types/variables... @@ -1264,3 +1268,81 @@ func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *sch } return true, nil, nil } + +type VolumeNodeChecker struct { + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo + client clientset.Interface +} + +// VolumeNodeChecker evaluates if a pod can fit due to the volumes it requests, given +// that some volumes have node topology constraints, particularly when using Local PVs. +// The requirement is that any pod that uses a PVC that is bound to a PV with topology constraints +// must be scheduled to a node that satisfies the PV's topology labels. +func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, client clientset.Interface) algorithm.FitPredicate { + c := &VolumeNodeChecker{ + pvInfo: pvInfo, + pvcInfo: pvcInfo, + client: client, + } + return c.predicate +} + +func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) { + return true, nil, nil + } + + // If a pod doesn't have any volume attached to it, the predicate will always be true. + // Thus we make a fast path for it, to avoid unnecessary computations in this case. + if len(pod.Spec.Volumes) == 0 { + return true, nil, nil + } + + node := nodeInfo.Node() + if node == nil { + return false, nil, fmt.Errorf("node not found") + } + + glog.V(2).Infof("Checking for prebound volumes with node affinity") + namespace := pod.Namespace + manifest := &(pod.Spec) + for i := range manifest.Volumes { + volume := &manifest.Volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvcName := volume.PersistentVolumeClaim.ClaimName + if pvcName == "" { + return false, nil, fmt.Errorf("PersistentVolumeClaim had no name") + } + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + if err != nil { + return false, nil, err + } + + if pvc == nil { + return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName) + } + pvName := pvc.Spec.VolumeName + if pvName == "" { + return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName) + } + + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + if err != nil { + return false, nil, err + } + if pv == nil { + return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName) + } + + err = volumeutil.CheckNodeAffinity(pv, node.Labels) + if err != nil { + glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q node mismatch: %v", pod.Name, node.Name, pvName, err.Error()) + return false, []algorithm.PredicateFailureReason{ErrVolumeNodeConflict}, nil + } + glog.V(4).Infof("VolumeNode predicate allows node %q for pod %q due to volume %q", node.Name, pod.Name, pvName) + } + return true, nil, nil +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index 267ba65cd4c..7b173b10da3 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -313,6 +313,77 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }, }, }, + // Do not change this JSON after the corresponding release has been tagged. + // A failure indicates backwards compatibility with the specified release was broken. + "1.7": { + JSON: `{ + "kind": "Policy", + "apiVersion": "v1", + "predicates": [ + {"name": "MatchNodeSelector"}, + {"name": "PodFitsResources"}, + {"name": "PodFitsHostPorts"}, + {"name": "HostName"}, + {"name": "NoDiskConflict"}, + {"name": "NoVolumeZoneConflict"}, + {"name": "PodToleratesNodeTaints"}, + {"name": "CheckNodeMemoryPressure"}, + {"name": "CheckNodeDiskPressure"}, + {"name": "MaxEBSVolumeCount"}, + {"name": "MaxGCEPDVolumeCount"}, + {"name": "MaxAzureDiskVolumeCount"}, + {"name": "MatchInterPodAffinity"}, + {"name": "GeneralPredicates"}, + {"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}}, + {"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}}, + {"name": "NoVolumeNodeConflict"} + ],"priorities": [ + {"name": "EqualPriority", "weight": 2}, + {"name": "ImageLocalityPriority", "weight": 2}, + {"name": "LeastRequestedPriority", "weight": 2}, + {"name": "BalancedResourceAllocation", "weight": 2}, + {"name": "SelectorSpreadPriority", "weight": 2}, + {"name": "NodePreferAvoidPodsPriority", "weight": 2}, + {"name": "NodeAffinityPriority", "weight": 2}, + {"name": "TaintTolerationPriority", "weight": 2}, + {"name": "InterPodAffinityPriority", "weight": 2}, + {"name": "MostRequestedPriority", "weight": 2} + ] + }`, + ExpectedPolicy: schedulerapi.Policy{ + Predicates: []schedulerapi.PredicatePolicy{ + {Name: "MatchNodeSelector"}, + {Name: "PodFitsResources"}, + {Name: "PodFitsHostPorts"}, + {Name: "HostName"}, + {Name: "NoDiskConflict"}, + {Name: "NoVolumeZoneConflict"}, + {Name: "PodToleratesNodeTaints"}, + {Name: "CheckNodeMemoryPressure"}, + {Name: "CheckNodeDiskPressure"}, + {Name: "MaxEBSVolumeCount"}, + {Name: "MaxGCEPDVolumeCount"}, + {Name: "MaxAzureDiskVolumeCount"}, + {Name: "MatchInterPodAffinity"}, + {Name: "GeneralPredicates"}, + {Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}}, + {Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}}, + {Name: "NoVolumeNodeConflict"}, + }, + Priorities: []schedulerapi.PriorityPolicy{ + {Name: "EqualPriority", Weight: 2}, + {Name: "ImageLocalityPriority", Weight: 2}, + {Name: "LeastRequestedPriority", Weight: 2}, + {Name: "BalancedResourceAllocation", Weight: 2}, + {Name: "SelectorSpreadPriority", Weight: 2}, + {Name: "NodePreferAvoidPodsPriority", Weight: 2}, + {Name: "NodeAffinityPriority", Weight: 2}, + {Name: "TaintTolerationPriority", Weight: 2}, + {Name: "InterPodAffinityPriority", Weight: 2}, + {Name: "MostRequestedPriority", Weight: 2}, + }, + }, + }, } registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index d03ca433ff9..94447611a13 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -176,6 +176,14 @@ func defaultPredicates() sets.String { // Fit is determined by node disk pressure condition. factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate), + + // Fit is determined by volume zone requirements. + factory.RegisterFitPredicateFactory( + "NoVolumeNodeConflict", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.NewVolumeNodePredicate(args.PVInfo, args.PVCInfo, nil) + }, + ), ) }