Scheduler predicate for already bound PVs with node affinity

This commit is contained in:
Michelle Au 2017-05-14 20:17:59 -07:00
parent dd46c7f88e
commit 61de4870de
5 changed files with 164 additions and 0 deletions

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/util:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
@ -31,6 +32,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library",
],

View File

@ -37,6 +37,7 @@ var (
ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount")
ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure")
ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure")
ErrVolumeNodeConflict = newPredicateFailureError("NoVolumeNodeConflict")
// ErrFakePredicate is used for test only. The fake predicates returning false also returns error
// as ErrFakePredicate.
ErrFakePredicate = newPredicateFailureError("FakePredicateError")

View File

@ -29,14 +29,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
// predicatePrecomputations: Helper types/variables...
@ -1264,3 +1268,81 @@ func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *sch
}
return true, nil, nil
}
type VolumeNodeChecker struct {
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
client clientset.Interface
}
// VolumeNodeChecker evaluates if a pod can fit due to the volumes it requests, given
// that some volumes have node topology constraints, particularly when using Local PVs.
// The requirement is that any pod that uses a PVC that is bound to a PV with topology constraints
// must be scheduled to a node that satisfies the PV's topology labels.
func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, client clientset.Interface) algorithm.FitPredicate {
c := &VolumeNodeChecker{
pvInfo: pvInfo,
pvcInfo: pvcInfo,
client: client,
}
return c.predicate
}
func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
return true, nil, nil
}
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
glog.V(2).Infof("Checking for prebound volumes with node affinity")
namespace := pod.Namespace
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
}
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
if err != nil {
return false, nil, err
}
if pvc == nil {
return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
}
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
if err != nil {
return false, nil, err
}
if pv == nil {
return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName)
}
err = volumeutil.CheckNodeAffinity(pv, node.Labels)
if err != nil {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q node mismatch: %v", pod.Name, node.Name, pvName, err.Error())
return false, []algorithm.PredicateFailureReason{ErrVolumeNodeConflict}, nil
}
glog.V(4).Infof("VolumeNode predicate allows node %q for pod %q due to volume %q", node.Name, pod.Name, pvName)
}
return true, nil, nil
}

View File

@ -313,6 +313,77 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
},
},
// Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken.
"1.7": {
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{"name": "MatchNodeSelector"},
{"name": "PodFitsResources"},
{"name": "PodFitsHostPorts"},
{"name": "HostName"},
{"name": "NoDiskConflict"},
{"name": "NoVolumeZoneConflict"},
{"name": "PodToleratesNodeTaints"},
{"name": "CheckNodeMemoryPressure"},
{"name": "CheckNodeDiskPressure"},
{"name": "MaxEBSVolumeCount"},
{"name": "MaxGCEPDVolumeCount"},
{"name": "MaxAzureDiskVolumeCount"},
{"name": "MatchInterPodAffinity"},
{"name": "GeneralPredicates"},
{"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}},
{"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}},
{"name": "NoVolumeNodeConflict"}
],"priorities": [
{"name": "EqualPriority", "weight": 2},
{"name": "ImageLocalityPriority", "weight": 2},
{"name": "LeastRequestedPriority", "weight": 2},
{"name": "BalancedResourceAllocation", "weight": 2},
{"name": "SelectorSpreadPriority", "weight": 2},
{"name": "NodePreferAvoidPodsPriority", "weight": 2},
{"name": "NodeAffinityPriority", "weight": 2},
{"name": "TaintTolerationPriority", "weight": 2},
{"name": "InterPodAffinityPriority", "weight": 2},
{"name": "MostRequestedPriority", "weight": 2}
]
}`,
ExpectedPolicy: schedulerapi.Policy{
Predicates: []schedulerapi.PredicatePolicy{
{Name: "MatchNodeSelector"},
{Name: "PodFitsResources"},
{Name: "PodFitsHostPorts"},
{Name: "HostName"},
{Name: "NoDiskConflict"},
{Name: "NoVolumeZoneConflict"},
{Name: "PodToleratesNodeTaints"},
{Name: "CheckNodeMemoryPressure"},
{Name: "CheckNodeDiskPressure"},
{Name: "MaxEBSVolumeCount"},
{Name: "MaxGCEPDVolumeCount"},
{Name: "MaxAzureDiskVolumeCount"},
{Name: "MatchInterPodAffinity"},
{Name: "GeneralPredicates"},
{Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}},
{Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}},
{Name: "NoVolumeNodeConflict"},
},
Priorities: []schedulerapi.PriorityPolicy{
{Name: "EqualPriority", Weight: 2},
{Name: "ImageLocalityPriority", Weight: 2},
{Name: "LeastRequestedPriority", Weight: 2},
{Name: "BalancedResourceAllocation", Weight: 2},
{Name: "SelectorSpreadPriority", Weight: 2},
{Name: "NodePreferAvoidPodsPriority", Weight: 2},
{Name: "NodeAffinityPriority", Weight: 2},
{Name: "TaintTolerationPriority", Weight: 2},
{Name: "InterPodAffinityPriority", Weight: 2},
{Name: "MostRequestedPriority", Weight: 2},
},
},
},
}
registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...)

View File

@ -176,6 +176,14 @@ func defaultPredicates() sets.String {
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeNodeConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeNodePredicate(args.PVInfo, args.PVCInfo, nil)
},
),
)
}