sched: support PreEnqueueChecks prior to moving Pods

This commit is contained in:
Wei Huang
2021-03-11 12:31:33 -08:00
parent 1db614ec8f
commit 6384f397b4
10 changed files with 298 additions and 64 deletions

View File

@@ -27,6 +27,12 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1helper "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
@@ -38,7 +44,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) {
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
@@ -46,15 +52,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@@ -71,20 +77,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd, nil)
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
@@ -94,12 +100,9 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
return
}
if err := sched.SchedulerCache.AddNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache AddNode failed")
}
nodeInfo := sched.SchedulerCache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@@ -114,13 +117,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
return
}
if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.ErrorS(err, "Scheduler cache UpdateNode failed")
}
nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event, preCheckForNode(nodeInfo))
}
}
@@ -152,11 +152,11 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
}
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil)
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
@@ -286,7 +286,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}
// assignedPod selects pods that are assigned (scheduled and running).
@@ -506,3 +506,39 @@ func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
// In addition to the checks in kubelet (pkg/kubelet/lifecycle/predicate.go#GeneralPredicates),
// the following logic appends a taint/toleration check.
// TODO: verify if kubelet should also apply the taint/toleration check, and then unify the
// logic with kubelet and move to a shared place.
//
// Note: the following checks doesn't take preemption into considerations, in very rare
// cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
// chose to ignore those cases as unschedulable pods will be re-queued eventually.
return func(pod *v1.Pod) bool {
if len(noderesources.Fits(pod, nodeInfo)) != 0 {
return false
}
// Ignore parsing errors for backwards compatibility.
matches, _ := nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node())
if !matches {
return false
}
if !nodename.Fits(pod, nodeInfo) {
return false
}
if !nodeports.Fits(pod, nodeInfo) {
return false
}
_, isUntolerated := v1helper.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
})
return !isUntolerated
}
}