mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #92108 from Huang-Wei/postfilter-impl-4
[postfilter-impl-4] Move Preempt() to defaultpreemption package.
This commit is contained in:
commit
ad29e168dc
@ -10,7 +10,6 @@ go_library(
|
|||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
|
||||||
"//pkg/scheduler/apis/config:go_default_library",
|
"//pkg/scheduler/apis/config:go_default_library",
|
||||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
@ -20,12 +19,8 @@ go_library(
|
|||||||
"//pkg/scheduler/profile:go_default_library",
|
"//pkg/scheduler/profile:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
@ -42,21 +37,12 @@ go_test(
|
|||||||
],
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/controller/volume/scheduling:go_default_library",
|
|
||||||
"//pkg/scheduler/apis/config:go_default_library",
|
"//pkg/scheduler/apis/config:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
|
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
|
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
|
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
|
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
|
||||||
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
|
|
||||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1/fake:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1/fake:go_default_library",
|
||||||
@ -66,7 +52,6 @@ go_test(
|
|||||||
"//pkg/scheduler/testing:go_default_library",
|
"//pkg/scheduler/testing:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
@ -75,9 +60,6 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -260,7 +260,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
client := clientsetfake.NewSimpleClientset()
|
client := clientsetfake.NewSimpleClientset()
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
|
|
||||||
extenders := []framework.Extender{}
|
var extenders []framework.Extender
|
||||||
for ii := range test.extenders {
|
for ii := range test.extenders {
|
||||||
extenders = append(extenders, &test.extenders[ii])
|
extenders = append(extenders, &test.extenders[ii])
|
||||||
}
|
}
|
||||||
@ -268,9 +268,12 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
for _, name := range test.nodes {
|
for _, name := range test.nodes {
|
||||||
cache.AddNode(createNode(name))
|
cache.AddNode(createNode(name))
|
||||||
}
|
}
|
||||||
queue := internalqueue.NewSchedulingQueue(nil)
|
|
||||||
|
|
||||||
fwk, err := st.NewFramework(test.registerPlugins, runtime.WithClientSet(client))
|
fwk, err := st.NewFramework(
|
||||||
|
test.registerPlugins,
|
||||||
|
runtime.WithClientSet(client),
|
||||||
|
runtime.WithPodNominator(internalqueue.NewPodNominator()),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -280,7 +283,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
queue,
|
|
||||||
emptySnapshot,
|
emptySnapshot,
|
||||||
extenders,
|
extenders,
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||||
|
@ -19,7 +19,6 @@ package core
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -30,14 +29,9 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policy "k8s.io/api/policy/v1beta1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
corelisters "k8s.io/client-go/listers/core/v1"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
@ -120,7 +114,6 @@ type ScheduleResult struct {
|
|||||||
|
|
||||||
type genericScheduler struct {
|
type genericScheduler struct {
|
||||||
cache internalcache.Cache
|
cache internalcache.Cache
|
||||||
podNominator framework.PodNominator
|
|
||||||
extenders []framework.Extender
|
extenders []framework.Extender
|
||||||
nodeInfoSnapshot *internalcache.Snapshot
|
nodeInfoSnapshot *internalcache.Snapshot
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister
|
pvcLister corelisters.PersistentVolumeClaimLister
|
||||||
@ -233,169 +226,6 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
|
|||||||
return selected, nil
|
return selected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Preempt finds nodes with pods that can be preempted to make room for "pod" to
|
|
||||||
// schedule. It chooses one of the nodes and preempts the pods on the node and
|
|
||||||
// returns 1) the node, 2) the list of preempted pods if such a node is found,
|
|
||||||
// 3) A list of pods whose nominated node name should be cleared, and 4) any
|
|
||||||
// possible error.
|
|
||||||
// Preempt does not update its snapshot. It uses the same snapshot used in the
|
|
||||||
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
|
|
||||||
// nodes without preempting any pod. When there are many pending pods in the
|
|
||||||
// scheduling queue a nominated pod will go back to the queue and behind
|
|
||||||
// other pods with the same priority. The nominated pod prevents other pods from
|
|
||||||
// using the nominated resources and the nominated pod could take a long time
|
|
||||||
// before it is retried after many other pending pods.
|
|
||||||
func Preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
|
|
||||||
cs := fh.ClientSet()
|
|
||||||
// TODO(Huang-Wei): get pod from informer cache instead of API server.
|
|
||||||
pod, err := util.GetUpdatedPod(cs, pod)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos()) {
|
|
||||||
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if len(allNodes) == 0 {
|
|
||||||
return "", ErrNoNodesAvailable
|
|
||||||
}
|
|
||||||
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
|
||||||
if len(potentialNodes) == 0 {
|
|
||||||
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
|
|
||||||
// In this case, we should clean-up any existing nominated node name of the pod.
|
|
||||||
if err := util.ClearNominatedNodeName(cs, pod); err != nil {
|
|
||||||
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
|
|
||||||
// We do not return as this error is not critical.
|
|
||||||
}
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
if klog.V(5).Enabled() {
|
|
||||||
var sample []string
|
|
||||||
for i := 0; i < 10 && i < len(potentialNodes); i++ {
|
|
||||||
sample = append(sample, potentialNodes[i].Node().Name)
|
|
||||||
}
|
|
||||||
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
|
|
||||||
}
|
|
||||||
pdbs, err := getPodDisruptionBudgets(fh)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
// We will only check nodeNameToVictims with extenders that support preemption.
|
|
||||||
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
|
||||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
|
||||||
nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
|
|
||||||
if len(candidateNode) == 0 {
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
victims := nodeNameToVictims[candidateNode].Pods
|
|
||||||
for _, victim := range victims {
|
|
||||||
if err := util.DeletePod(cs, victim); err != nil {
|
|
||||||
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
|
||||||
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
|
||||||
waitingPod.Reject("preempted")
|
|
||||||
}
|
|
||||||
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
|
|
||||||
}
|
|
||||||
metrics.PreemptionVictims.Observe(float64(len(victims)))
|
|
||||||
|
|
||||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
|
||||||
// this node. So, we should remove their nomination. Removing their
|
|
||||||
// nomination updates these pods and moves them to the active queue. It
|
|
||||||
// lets scheduler find another place for them.
|
|
||||||
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
|
|
||||||
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
|
||||||
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
|
|
||||||
// We do not return as this error is not critical.
|
|
||||||
}
|
|
||||||
|
|
||||||
return candidateNode, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
|
|
||||||
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// processPreemptionWithExtenders processes preemption with extenders
|
|
||||||
func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) {
|
|
||||||
if len(nodeNameToVictims) > 0 {
|
|
||||||
for _, extender := range fh.PreemptHandle().Extenders() {
|
|
||||||
if extender.SupportsPreemption() && extender.IsInterested(pod) {
|
|
||||||
newNodeNameToVictims, err := extender.ProcessPreemption(
|
|
||||||
pod,
|
|
||||||
nodeNameToVictims,
|
|
||||||
fh.SnapshotSharedLister().NodeInfos(),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
if extender.IsIgnorable() {
|
|
||||||
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
|
||||||
extender, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Replace nodeNameToVictims with new result after preemption. So the
|
|
||||||
// rest of extenders can continue use it as parameter.
|
|
||||||
nodeNameToVictims = newNodeNameToVictims
|
|
||||||
|
|
||||||
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
|
||||||
if len(nodeNameToVictims) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodeNameToVictims, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
|
||||||
// priority of the given "pod" and are nominated to run on the given node.
|
|
||||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
|
||||||
// and return those that no longer fit, but that would require lots of
|
|
||||||
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
|
||||||
// worth the complexity, especially because we generally expect to have a very
|
|
||||||
// small number of nominated pods per node.
|
|
||||||
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
|
|
||||||
pods := pn.NominatedPodsForNode(nodeName)
|
|
||||||
|
|
||||||
if len(pods) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var lowerPriorityPods []*v1.Pod
|
|
||||||
podPriority := podutil.GetPodPriority(pod)
|
|
||||||
for _, p := range pods {
|
|
||||||
if podutil.GetPodPriority(p) < podPriority {
|
|
||||||
lowerPriorityPods = append(lowerPriorityPods, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return lowerPriorityPods
|
|
||||||
}
|
|
||||||
|
|
||||||
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
|
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
|
||||||
// its search for more feasible nodes.
|
// its search for more feasible nodes.
|
||||||
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
|
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
|
||||||
@ -472,7 +302,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
|
|||||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||||
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
||||||
fits, status, err := podPassesFiltersOnNode(ctx, prof, g.podNominator, state, pod, nodeInfo)
|
fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
return
|
return
|
||||||
@ -550,12 +380,12 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
|
|||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||||
// 3) augmented nodeInfo.
|
// 3) augmented nodeInfo.
|
||||||
func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator framework.PodNominator, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
||||||
if nominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
if ph == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
||||||
// This may happen only in tests.
|
// This may happen only in tests.
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
nominatedPods := nominator.NominatedPodsForNode(nodeInfo.Node().Name)
|
nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name)
|
||||||
if len(nominatedPods) == 0 {
|
if len(nominatedPods) == 0 {
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
@ -565,7 +395,7 @@ func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator
|
|||||||
for _, p := range nominatedPods {
|
for _, p := range nominatedPods {
|
||||||
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
|
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
|
||||||
nodeInfoOut.AddPod(p)
|
nodeInfoOut.AddPod(p)
|
||||||
status := pr.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return false, state, nodeInfo, status.AsError()
|
return false, state, nodeInfo, status.AsError()
|
||||||
}
|
}
|
||||||
@ -575,7 +405,7 @@ func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator
|
|||||||
return podsAdded, stateOut, nodeInfoOut, nil
|
return podsAdded, stateOut, nodeInfoOut, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
|
// PodPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
|
||||||
// filter plugins.
|
// filter plugins.
|
||||||
// This function is called from two different places: Schedule and Preempt.
|
// This function is called from two different places: Schedule and Preempt.
|
||||||
// When it is called from Schedule, we want to test whether the pod is
|
// When it is called from Schedule, we want to test whether the pod is
|
||||||
@ -585,10 +415,10 @@ func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator
|
|||||||
// and add the nominated pods. Removal of the victims is done by
|
// and add the nominated pods. Removal of the victims is done by
|
||||||
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
||||||
// NodeInfo before calling this function.
|
// NodeInfo before calling this function.
|
||||||
func podPassesFiltersOnNode(
|
// TODO: move this out so that plugins don't need to depend on <core> pkg.
|
||||||
|
func PodPassesFiltersOnNode(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
pr framework.PluginsRunner,
|
ph framework.PreemptHandle,
|
||||||
nominator framework.PodNominator,
|
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
info *framework.NodeInfo,
|
info *framework.NodeInfo,
|
||||||
@ -619,7 +449,7 @@ func podPassesFiltersOnNode(
|
|||||||
nodeInfoToUse := info
|
nodeInfoToUse := info
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
var err error
|
var err error
|
||||||
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, pr, nominator, pod, state, info)
|
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil, err
|
return false, nil, err
|
||||||
}
|
}
|
||||||
@ -627,7 +457,7 @@ func podPassesFiltersOnNode(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMap := pr.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
||||||
status = statusMap.Merge()
|
status = statusMap.Merge()
|
||||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||||
return false, status, status.AsError()
|
return false, status, status.AsError()
|
||||||
@ -738,377 +568,6 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
|
|
||||||
// pods in each map entry are ordered by decreasing priority.
|
|
||||||
// It picks a node based on the following criteria:
|
|
||||||
// 1. A node with minimum number of PDB violations.
|
|
||||||
// 2. A node with minimum highest priority victim is picked.
|
|
||||||
// 3. Ties are broken by sum of priorities of all victims.
|
|
||||||
// 4. If there are still ties, node with the minimum number of victims is picked.
|
|
||||||
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
|
||||||
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
|
||||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
|
||||||
// allocation and garbage collection time.
|
|
||||||
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
|
|
||||||
if len(nodesToVictims) == 0 {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
minNumPDBViolatingPods := int64(math.MaxInt32)
|
|
||||||
var minNodes1 []string
|
|
||||||
lenNodes1 := 0
|
|
||||||
for node, victims := range nodesToVictims {
|
|
||||||
if len(victims.Pods) == 0 {
|
|
||||||
// We found a node that doesn't need any preemption. Return it!
|
|
||||||
// This should happen rarely when one or more pods are terminated between
|
|
||||||
// the time that scheduler tries to schedule the pod and the time that
|
|
||||||
// preemption logic tries to find nodes for preemption.
|
|
||||||
return node
|
|
||||||
}
|
|
||||||
numPDBViolatingPods := victims.NumPDBViolations
|
|
||||||
if numPDBViolatingPods < minNumPDBViolatingPods {
|
|
||||||
minNumPDBViolatingPods = numPDBViolatingPods
|
|
||||||
minNodes1 = nil
|
|
||||||
lenNodes1 = 0
|
|
||||||
}
|
|
||||||
if numPDBViolatingPods == minNumPDBViolatingPods {
|
|
||||||
minNodes1 = append(minNodes1, node)
|
|
||||||
lenNodes1++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lenNodes1 == 1 {
|
|
||||||
return minNodes1[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are more than one node with minimum number PDB violating pods. Find
|
|
||||||
// the one with minimum highest priority victim.
|
|
||||||
minHighestPriority := int32(math.MaxInt32)
|
|
||||||
var minNodes2 = make([]string, lenNodes1)
|
|
||||||
lenNodes2 := 0
|
|
||||||
for i := 0; i < lenNodes1; i++ {
|
|
||||||
node := minNodes1[i]
|
|
||||||
victims := nodesToVictims[node]
|
|
||||||
// highestPodPriority is the highest priority among the victims on this node.
|
|
||||||
highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
|
|
||||||
if highestPodPriority < minHighestPriority {
|
|
||||||
minHighestPriority = highestPodPriority
|
|
||||||
lenNodes2 = 0
|
|
||||||
}
|
|
||||||
if highestPodPriority == minHighestPriority {
|
|
||||||
minNodes2[lenNodes2] = node
|
|
||||||
lenNodes2++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lenNodes2 == 1 {
|
|
||||||
return minNodes2[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are a few nodes with minimum highest priority victim. Find the
|
|
||||||
// smallest sum of priorities.
|
|
||||||
minSumPriorities := int64(math.MaxInt64)
|
|
||||||
lenNodes1 = 0
|
|
||||||
for i := 0; i < lenNodes2; i++ {
|
|
||||||
var sumPriorities int64
|
|
||||||
node := minNodes2[i]
|
|
||||||
for _, pod := range nodesToVictims[node].Pods {
|
|
||||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
|
||||||
// needed so that a node with a few pods with negative priority is not
|
|
||||||
// picked over a node with a smaller number of pods with the same negative
|
|
||||||
// priority (and similar scenarios).
|
|
||||||
sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
|
|
||||||
}
|
|
||||||
if sumPriorities < minSumPriorities {
|
|
||||||
minSumPriorities = sumPriorities
|
|
||||||
lenNodes1 = 0
|
|
||||||
}
|
|
||||||
if sumPriorities == minSumPriorities {
|
|
||||||
minNodes1[lenNodes1] = node
|
|
||||||
lenNodes1++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lenNodes1 == 1 {
|
|
||||||
return minNodes1[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are a few nodes with minimum highest priority victim and sum of priorities.
|
|
||||||
// Find one with the minimum number of pods.
|
|
||||||
minNumPods := math.MaxInt32
|
|
||||||
lenNodes2 = 0
|
|
||||||
for i := 0; i < lenNodes1; i++ {
|
|
||||||
node := minNodes1[i]
|
|
||||||
numPods := len(nodesToVictims[node].Pods)
|
|
||||||
if numPods < minNumPods {
|
|
||||||
minNumPods = numPods
|
|
||||||
lenNodes2 = 0
|
|
||||||
}
|
|
||||||
if numPods == minNumPods {
|
|
||||||
minNodes2[lenNodes2] = node
|
|
||||||
lenNodes2++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lenNodes2 == 1 {
|
|
||||||
return minNodes2[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are a few nodes with same number of pods.
|
|
||||||
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
|
|
||||||
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
|
|
||||||
if latestStartTime == nil {
|
|
||||||
// If the earliest start time of all pods on the 1st node is nil, just return it,
|
|
||||||
// which is not expected to happen.
|
|
||||||
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
|
|
||||||
return minNodes2[0]
|
|
||||||
}
|
|
||||||
nodeToReturn := minNodes2[0]
|
|
||||||
for i := 1; i < lenNodes2; i++ {
|
|
||||||
node := minNodes2[i]
|
|
||||||
// Get earliest start time of all pods on the current node.
|
|
||||||
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
|
|
||||||
if earliestStartTimeOnNode == nil {
|
|
||||||
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if earliestStartTimeOnNode.After(latestStartTime.Time) {
|
|
||||||
latestStartTime = earliestStartTimeOnNode
|
|
||||||
nodeToReturn = node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodeToReturn
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectNodesForPreemption finds all the nodes with possible victims for
|
|
||||||
// preemption in parallel.
|
|
||||||
func selectNodesForPreemption(
|
|
||||||
ctx context.Context,
|
|
||||||
pr framework.PluginsRunner,
|
|
||||||
nominator framework.PodNominator,
|
|
||||||
state *framework.CycleState,
|
|
||||||
pod *v1.Pod,
|
|
||||||
potentialNodes []*framework.NodeInfo,
|
|
||||||
pdbs []*policy.PodDisruptionBudget,
|
|
||||||
) (map[string]*extenderv1.Victims, error) {
|
|
||||||
nodeNameToVictims := map[string]*extenderv1.Victims{}
|
|
||||||
var resultLock sync.Mutex
|
|
||||||
|
|
||||||
checkNode := func(i int) {
|
|
||||||
nodeInfoCopy := potentialNodes[i].Clone()
|
|
||||||
stateCopy := state.Clone()
|
|
||||||
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, pr, nominator, stateCopy, pod, nodeInfoCopy, pdbs)
|
|
||||||
if fits {
|
|
||||||
resultLock.Lock()
|
|
||||||
victims := extenderv1.Victims{
|
|
||||||
Pods: pods,
|
|
||||||
NumPDBViolations: int64(numPDBViolations),
|
|
||||||
}
|
|
||||||
nodeNameToVictims[potentialNodes[i].Node().Name] = &victims
|
|
||||||
resultLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
parallelize.Until(ctx, len(potentialNodes), checkNode)
|
|
||||||
return nodeNameToVictims, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
|
||||||
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
|
|
||||||
// preempted.
|
|
||||||
// This function is stable and does not change the order of received pods. So, if it
|
|
||||||
// receives a sorted list, grouping will preserve the order of the input list.
|
|
||||||
func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
|
|
||||||
pdbsAllowed := make([]int32, len(pdbs))
|
|
||||||
for i, pdb := range pdbs {
|
|
||||||
pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, obj := range pods {
|
|
||||||
pod := obj
|
|
||||||
pdbForPodIsViolated := false
|
|
||||||
// A pod with no labels will not match any PDB. So, no need to check.
|
|
||||||
if len(pod.Labels) != 0 {
|
|
||||||
for i, pdb := range pdbs {
|
|
||||||
if pdb.Namespace != pod.Namespace {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// A PDB with a nil or empty selector matches nothing.
|
|
||||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Existing in DisruptedPods means it has been processed in API server,
|
|
||||||
// we don't treat it as a violating case.
|
|
||||||
if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Only decrement the matched pdb when it's not in its <DisruptedPods>;
|
|
||||||
// otherwise we may over-decrement the budget number.
|
|
||||||
pdbsAllowed[i]--
|
|
||||||
// We have found a matching PDB.
|
|
||||||
if pdbsAllowed[i] < 0 {
|
|
||||||
pdbForPodIsViolated = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if pdbForPodIsViolated {
|
|
||||||
violatingPods = append(violatingPods, pod)
|
|
||||||
} else {
|
|
||||||
nonViolatingPods = append(nonViolatingPods, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return violatingPods, nonViolatingPods
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectVictimsOnNode finds minimum set of pods on the given node that should
|
|
||||||
// be preempted in order to make enough room for "pod" to be scheduled. The
|
|
||||||
// minimum set selected is subject to the constraint that a higher-priority pod
|
|
||||||
// is never preempted when a lower-priority pod could be (higher/lower relative
|
|
||||||
// to one another, not relative to the preemptor "pod").
|
|
||||||
// The algorithm first checks if the pod can be scheduled on the node when all the
|
|
||||||
// lower priority pods are gone. If so, it sorts all the lower priority pods by
|
|
||||||
// their priority and then puts them into two groups of those whose PodDisruptionBudget
|
|
||||||
// will be violated if preempted and other non-violating pods. Both groups are
|
|
||||||
// sorted by priority. It first tries to reprieve as many PDB violating pods as
|
|
||||||
// possible and then does them same for non-PDB-violating pods while checking
|
|
||||||
// that the "pod" can still fit on the node.
|
|
||||||
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
|
||||||
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
|
||||||
// these predicates can be satisfied by removing more pods from the node.
|
|
||||||
func selectVictimsOnNode(
|
|
||||||
ctx context.Context,
|
|
||||||
pr framework.PluginsRunner,
|
|
||||||
nominator framework.PodNominator,
|
|
||||||
state *framework.CycleState,
|
|
||||||
pod *v1.Pod,
|
|
||||||
nodeInfo *framework.NodeInfo,
|
|
||||||
pdbs []*policy.PodDisruptionBudget,
|
|
||||||
) ([]*v1.Pod, int, bool) {
|
|
||||||
var potentialVictims []*v1.Pod
|
|
||||||
|
|
||||||
removePod := func(rp *v1.Pod) error {
|
|
||||||
if err := nodeInfo.RemovePod(rp); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
status := pr.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
|
|
||||||
if !status.IsSuccess() {
|
|
||||||
return status.AsError()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
addPod := func(ap *v1.Pod) error {
|
|
||||||
nodeInfo.AddPod(ap)
|
|
||||||
status := pr.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
|
|
||||||
if !status.IsSuccess() {
|
|
||||||
return status.AsError()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// As the first step, remove all the lower priority pods from the node and
|
|
||||||
// check if the given pod can be scheduled.
|
|
||||||
podPriority := podutil.GetPodPriority(pod)
|
|
||||||
for _, p := range nodeInfo.Pods {
|
|
||||||
if podutil.GetPodPriority(p.Pod) < podPriority {
|
|
||||||
potentialVictims = append(potentialVictims, p.Pod)
|
|
||||||
if err := removePod(p.Pod); err != nil {
|
|
||||||
return nil, 0, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If the new pod does not fit after removing all the lower priority pods,
|
|
||||||
// we are almost done and this node is not suitable for preemption. The only
|
|
||||||
// condition that we could check is if the "pod" is failing to schedule due to
|
|
||||||
// inter-pod affinity to one or more victims, but we have decided not to
|
|
||||||
// support this case for performance reasons. Having affinity to lower
|
|
||||||
// priority pods is not a recommended configuration anyway.
|
|
||||||
if fits, _, err := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo); !fits {
|
|
||||||
if err != nil {
|
|
||||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, 0, false
|
|
||||||
}
|
|
||||||
var victims []*v1.Pod
|
|
||||||
numViolatingVictim := 0
|
|
||||||
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
|
|
||||||
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
|
|
||||||
// violating victims and then other non-violating ones. In both cases, we start
|
|
||||||
// from the highest priority victims.
|
|
||||||
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
|
|
||||||
reprievePod := func(p *v1.Pod) (bool, error) {
|
|
||||||
if err := addPod(p); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
fits, _, _ := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo)
|
|
||||||
if !fits {
|
|
||||||
if err := removePod(p); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
victims = append(victims, p)
|
|
||||||
klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
|
|
||||||
}
|
|
||||||
return fits, nil
|
|
||||||
}
|
|
||||||
for _, p := range violatingVictims {
|
|
||||||
if fits, err := reprievePod(p); err != nil {
|
|
||||||
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
|
|
||||||
return nil, 0, false
|
|
||||||
} else if !fits {
|
|
||||||
numViolatingVictim++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Now we try to reprieve non-violating victims.
|
|
||||||
for _, p := range nonViolatingVictims {
|
|
||||||
if _, err := reprievePod(p); err != nil {
|
|
||||||
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
|
|
||||||
return nil, 0, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return victims, numViolatingVictim, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
|
||||||
// that may be satisfied by removing pods from the node.
|
|
||||||
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
|
|
||||||
var potentialNodes []*framework.NodeInfo
|
|
||||||
for _, node := range nodes {
|
|
||||||
name := node.Node().Name
|
|
||||||
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
|
||||||
// to determine whether preemption may help or not on the node.
|
|
||||||
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
potentialNodes = append(potentialNodes, node)
|
|
||||||
}
|
|
||||||
return potentialNodes
|
|
||||||
}
|
|
||||||
|
|
||||||
// podEligibleToPreemptOthers determines whether this pod should be considered
|
|
||||||
// for preempting other pods or not. If this pod has already preempted other
|
|
||||||
// pods and those are in their graceful termination period, it shouldn't be
|
|
||||||
// considered for preemption.
|
|
||||||
// We look at the node that is nominated for this pod and as long as there are
|
|
||||||
// terminating pods on the node, we don't consider this for preempting more pods.
|
|
||||||
func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister) bool {
|
|
||||||
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
|
||||||
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
nomNodeName := pod.Status.NominatedNodeName
|
|
||||||
if len(nomNodeName) > 0 {
|
|
||||||
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
|
||||||
podPriority := podutil.GetPodPriority(pod)
|
|
||||||
for _, p := range nodeInfo.Pods {
|
|
||||||
if p.Pod.DeletionTimestamp != nil && podutil.GetPodPriority(p.Pod) < podPriority {
|
|
||||||
// There is a terminating pod on the nominated node.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
|
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
|
||||||
func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
|
func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
|
||||||
// Check PVCs used by the pod
|
// Check PVCs used by the pod
|
||||||
@ -1138,7 +597,6 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
|
|||||||
// NewGenericScheduler creates a genericScheduler object.
|
// NewGenericScheduler creates a genericScheduler object.
|
||||||
func NewGenericScheduler(
|
func NewGenericScheduler(
|
||||||
cache internalcache.Cache,
|
cache internalcache.Cache,
|
||||||
podNominator framework.PodNominator,
|
|
||||||
nodeInfoSnapshot *internalcache.Snapshot,
|
nodeInfoSnapshot *internalcache.Snapshot,
|
||||||
extenders []framework.Extender,
|
extenders []framework.Extender,
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||||
@ -1146,7 +604,6 @@ func NewGenericScheduler(
|
|||||||
percentageOfNodesToScore int32) ScheduleAlgorithm {
|
percentageOfNodesToScore int32) ScheduleAlgorithm {
|
||||||
return &genericScheduler{
|
return &genericScheduler{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
podNominator: podNominator,
|
|
||||||
extenders: extenders,
|
extenders: extenders,
|
||||||
nodeInfoSnapshot: nodeInfoSnapshot,
|
nodeInfoSnapshot: nodeInfoSnapshot,
|
||||||
pvcLister: pvcLister,
|
pvcLister: pvcLister,
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -186,7 +186,6 @@ func (c *Configurator) create() (*Scheduler, error) {
|
|||||||
|
|
||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
c.schedulerCache,
|
c.schedulerCache,
|
||||||
nominator,
|
|
||||||
c.nodeInfoSnapshot,
|
c.nodeInfoSnapshot,
|
||||||
extenders,
|
extenders,
|
||||||
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
@ -6,13 +6,21 @@ go_library(
|
|||||||
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption",
|
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/scheduler/core:go_default_library",
|
"//pkg/scheduler/core:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
|
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,3 +37,43 @@ filegroup(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["default_preemption_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/controller/volume/scheduling:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||||
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
|
"//pkg/scheduler/testing:go_default_library",
|
||||||
|
"//pkg/scheduler/util:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@ -18,15 +18,27 @@ package defaultpreemption
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
policy "k8s.io/api/policy/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -65,7 +77,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
|
|||||||
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
|
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
nnn, err := core.Preempt(ctx, pl.fh, state, pod, m)
|
nnn, err := preempt(ctx, pl.fh, state, pod, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, framework.NewStatus(framework.Error, err.Error())
|
return nil, framework.NewStatus(framework.Error, err.Error())
|
||||||
}
|
}
|
||||||
@ -74,3 +86,535 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
|
|||||||
}
|
}
|
||||||
return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
|
return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preempt finds nodes with pods that can be preempted to make room for "pod" to
|
||||||
|
// schedule. It chooses one of the nodes and preempts the pods on the node and
|
||||||
|
// returns 1) the node, 2) the list of preempted pods if such a node is found,
|
||||||
|
// 3) A list of pods whose nominated node name should be cleared, and 4) any
|
||||||
|
// possible error.
|
||||||
|
// preempt does not update its snapshot. It uses the same snapshot used in the
|
||||||
|
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
|
||||||
|
// nodes without preempting any pod. When there are many pending pods in the
|
||||||
|
// scheduling queue a nominated pod will go back to the queue and behind
|
||||||
|
// other pods with the same priority. The nominated pod prevents other pods from
|
||||||
|
// using the nominated resources and the nominated pod could take a long time
|
||||||
|
// before it is retried after many other pending pods.
|
||||||
|
func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
|
||||||
|
cs := fh.ClientSet()
|
||||||
|
// TODO(Huang-Wei): get pod from informer cache instead of API server.
|
||||||
|
pod, err := util.GetUpdatedPod(cs, pod)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos()) {
|
||||||
|
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if len(allNodes) == 0 {
|
||||||
|
return "", core.ErrNoNodesAvailable
|
||||||
|
}
|
||||||
|
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
||||||
|
if len(potentialNodes) == 0 {
|
||||||
|
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
|
||||||
|
// In this case, we should clean-up any existing nominated node name of the pod.
|
||||||
|
if err := util.ClearNominatedNodeName(cs, pod); err != nil {
|
||||||
|
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
|
||||||
|
// We do not return as this error is not critical.
|
||||||
|
}
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
if klog.V(5).Enabled() {
|
||||||
|
var sample []string
|
||||||
|
for i := 0; i < 10 && i < len(potentialNodes); i++ {
|
||||||
|
sample = append(sample, potentialNodes[i].Node().Name)
|
||||||
|
}
|
||||||
|
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
|
||||||
|
}
|
||||||
|
pdbs, err := getPodDisruptionBudgets(fh)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We will only check nodeNameToVictims with extenders that support preemption.
|
||||||
|
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
||||||
|
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||||
|
nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
|
||||||
|
if len(candidateNode) == 0 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
victims := nodeNameToVictims[candidateNode].Pods
|
||||||
|
for _, victim := range victims {
|
||||||
|
if err := util.DeletePod(cs, victim); err != nil {
|
||||||
|
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
||||||
|
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||||
|
waitingPod.Reject("preempted")
|
||||||
|
}
|
||||||
|
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
|
||||||
|
}
|
||||||
|
metrics.PreemptionVictims.Observe(float64(len(victims)))
|
||||||
|
|
||||||
|
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||||
|
// this node. So, we should remove their nomination. Removing their
|
||||||
|
// nomination updates these pods and moves them to the active queue. It
|
||||||
|
// lets scheduler find another place for them.
|
||||||
|
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
|
||||||
|
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
||||||
|
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
|
||||||
|
// We do not return as this error is not critical.
|
||||||
|
}
|
||||||
|
|
||||||
|
return candidateNode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// podEligibleToPreemptOthers determines whether this pod should be considered
|
||||||
|
// for preempting other pods or not. If this pod has already preempted other
|
||||||
|
// pods and those are in their graceful termination period, it shouldn't be
|
||||||
|
// considered for preemption.
|
||||||
|
// We look at the node that is nominated for this pod and as long as there are
|
||||||
|
// terminating pods on the node, we don't consider this for preempting more pods.
|
||||||
|
func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister) bool {
|
||||||
|
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
||||||
|
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
nomNodeName := pod.Status.NominatedNodeName
|
||||||
|
if len(nomNodeName) > 0 {
|
||||||
|
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
||||||
|
podPriority := podutil.GetPodPriority(pod)
|
||||||
|
for _, p := range nodeInfo.Pods {
|
||||||
|
if p.Pod.DeletionTimestamp != nil && podutil.GetPodPriority(p.Pod) < podPriority {
|
||||||
|
// There is a terminating pod on the nominated node.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
||||||
|
// that may be satisfied by removing pods from the node.
|
||||||
|
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
|
||||||
|
var potentialNodes []*framework.NodeInfo
|
||||||
|
for _, node := range nodes {
|
||||||
|
name := node.Node().Name
|
||||||
|
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
||||||
|
// to determine whether preemption may help or not on the node.
|
||||||
|
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
potentialNodes = append(potentialNodes, node)
|
||||||
|
}
|
||||||
|
return potentialNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectNodesForPreemption finds all the nodes with possible victims for
|
||||||
|
// preemption in parallel.
|
||||||
|
func selectNodesForPreemption(
|
||||||
|
ctx context.Context,
|
||||||
|
fh framework.PreemptHandle,
|
||||||
|
state *framework.CycleState,
|
||||||
|
pod *v1.Pod,
|
||||||
|
potentialNodes []*framework.NodeInfo,
|
||||||
|
pdbs []*policy.PodDisruptionBudget,
|
||||||
|
) (map[string]*extenderv1.Victims, error) {
|
||||||
|
nodeNameToVictims := map[string]*extenderv1.Victims{}
|
||||||
|
var resultLock sync.Mutex
|
||||||
|
|
||||||
|
checkNode := func(i int) {
|
||||||
|
nodeInfoCopy := potentialNodes[i].Clone()
|
||||||
|
stateCopy := state.Clone()
|
||||||
|
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
|
||||||
|
if fits {
|
||||||
|
resultLock.Lock()
|
||||||
|
victims := extenderv1.Victims{
|
||||||
|
Pods: pods,
|
||||||
|
NumPDBViolations: int64(numPDBViolations),
|
||||||
|
}
|
||||||
|
nodeNameToVictims[potentialNodes[i].Node().Name] = &victims
|
||||||
|
resultLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
parallelize.Until(ctx, len(potentialNodes), checkNode)
|
||||||
|
return nodeNameToVictims, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processPreemptionWithExtenders processes preemption with extenders
|
||||||
|
func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) {
|
||||||
|
if len(nodeNameToVictims) > 0 {
|
||||||
|
for _, extender := range fh.PreemptHandle().Extenders() {
|
||||||
|
if extender.SupportsPreemption() && extender.IsInterested(pod) {
|
||||||
|
newNodeNameToVictims, err := extender.ProcessPreemption(
|
||||||
|
pod,
|
||||||
|
nodeNameToVictims,
|
||||||
|
fh.SnapshotSharedLister().NodeInfos(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if extender.IsIgnorable() {
|
||||||
|
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
||||||
|
extender, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace nodeNameToVictims with new result after preemption. So the
|
||||||
|
// rest of extenders can continue use it as parameter.
|
||||||
|
nodeNameToVictims = newNodeNameToVictims
|
||||||
|
|
||||||
|
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
||||||
|
if len(nodeNameToVictims) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodeNameToVictims, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
|
||||||
|
// pods in each map entry are ordered by decreasing priority.
|
||||||
|
// It picks a node based on the following criteria:
|
||||||
|
// 1. A node with minimum number of PDB violations.
|
||||||
|
// 2. A node with minimum highest priority victim is picked.
|
||||||
|
// 3. Ties are broken by sum of priorities of all victims.
|
||||||
|
// 4. If there are still ties, node with the minimum number of victims is picked.
|
||||||
|
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
||||||
|
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
||||||
|
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||||
|
// allocation and garbage collection time.
|
||||||
|
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
|
||||||
|
if len(nodesToVictims) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
minNumPDBViolatingPods := int64(math.MaxInt32)
|
||||||
|
var minNodes1 []string
|
||||||
|
lenNodes1 := 0
|
||||||
|
for node, victims := range nodesToVictims {
|
||||||
|
if len(victims.Pods) == 0 {
|
||||||
|
// We found a node that doesn't need any preemption. Return it!
|
||||||
|
// This should happen rarely when one or more pods are terminated between
|
||||||
|
// the time that scheduler tries to schedule the pod and the time that
|
||||||
|
// preemption logic tries to find nodes for preemption.
|
||||||
|
return node
|
||||||
|
}
|
||||||
|
numPDBViolatingPods := victims.NumPDBViolations
|
||||||
|
if numPDBViolatingPods < minNumPDBViolatingPods {
|
||||||
|
minNumPDBViolatingPods = numPDBViolatingPods
|
||||||
|
minNodes1 = nil
|
||||||
|
lenNodes1 = 0
|
||||||
|
}
|
||||||
|
if numPDBViolatingPods == minNumPDBViolatingPods {
|
||||||
|
minNodes1 = append(minNodes1, node)
|
||||||
|
lenNodes1++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lenNodes1 == 1 {
|
||||||
|
return minNodes1[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are more than one node with minimum number PDB violating pods. Find
|
||||||
|
// the one with minimum highest priority victim.
|
||||||
|
minHighestPriority := int32(math.MaxInt32)
|
||||||
|
var minNodes2 = make([]string, lenNodes1)
|
||||||
|
lenNodes2 := 0
|
||||||
|
for i := 0; i < lenNodes1; i++ {
|
||||||
|
node := minNodes1[i]
|
||||||
|
victims := nodesToVictims[node]
|
||||||
|
// highestPodPriority is the highest priority among the victims on this node.
|
||||||
|
highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
|
||||||
|
if highestPodPriority < minHighestPriority {
|
||||||
|
minHighestPriority = highestPodPriority
|
||||||
|
lenNodes2 = 0
|
||||||
|
}
|
||||||
|
if highestPodPriority == minHighestPriority {
|
||||||
|
minNodes2[lenNodes2] = node
|
||||||
|
lenNodes2++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lenNodes2 == 1 {
|
||||||
|
return minNodes2[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are a few nodes with minimum highest priority victim. Find the
|
||||||
|
// smallest sum of priorities.
|
||||||
|
minSumPriorities := int64(math.MaxInt64)
|
||||||
|
lenNodes1 = 0
|
||||||
|
for i := 0; i < lenNodes2; i++ {
|
||||||
|
var sumPriorities int64
|
||||||
|
node := minNodes2[i]
|
||||||
|
for _, pod := range nodesToVictims[node].Pods {
|
||||||
|
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||||
|
// needed so that a node with a few pods with negative priority is not
|
||||||
|
// picked over a node with a smaller number of pods with the same negative
|
||||||
|
// priority (and similar scenarios).
|
||||||
|
sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
|
||||||
|
}
|
||||||
|
if sumPriorities < minSumPriorities {
|
||||||
|
minSumPriorities = sumPriorities
|
||||||
|
lenNodes1 = 0
|
||||||
|
}
|
||||||
|
if sumPriorities == minSumPriorities {
|
||||||
|
minNodes1[lenNodes1] = node
|
||||||
|
lenNodes1++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lenNodes1 == 1 {
|
||||||
|
return minNodes1[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are a few nodes with minimum highest priority victim and sum of priorities.
|
||||||
|
// Find one with the minimum number of pods.
|
||||||
|
minNumPods := math.MaxInt32
|
||||||
|
lenNodes2 = 0
|
||||||
|
for i := 0; i < lenNodes1; i++ {
|
||||||
|
node := minNodes1[i]
|
||||||
|
numPods := len(nodesToVictims[node].Pods)
|
||||||
|
if numPods < minNumPods {
|
||||||
|
minNumPods = numPods
|
||||||
|
lenNodes2 = 0
|
||||||
|
}
|
||||||
|
if numPods == minNumPods {
|
||||||
|
minNodes2[lenNodes2] = node
|
||||||
|
lenNodes2++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lenNodes2 == 1 {
|
||||||
|
return minNodes2[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are a few nodes with same number of pods.
|
||||||
|
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
|
||||||
|
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
|
||||||
|
if latestStartTime == nil {
|
||||||
|
// If the earliest start time of all pods on the 1st node is nil, just return it,
|
||||||
|
// which is not expected to happen.
|
||||||
|
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
|
||||||
|
return minNodes2[0]
|
||||||
|
}
|
||||||
|
nodeToReturn := minNodes2[0]
|
||||||
|
for i := 1; i < lenNodes2; i++ {
|
||||||
|
node := minNodes2[i]
|
||||||
|
// Get earliest start time of all pods on the current node.
|
||||||
|
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
|
||||||
|
if earliestStartTimeOnNode == nil {
|
||||||
|
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if earliestStartTimeOnNode.After(latestStartTime.Time) {
|
||||||
|
latestStartTime = earliestStartTimeOnNode
|
||||||
|
nodeToReturn = node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodeToReturn
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectVictimsOnNode finds minimum set of pods on the given node that should
|
||||||
|
// be preempted in order to make enough room for "pod" to be scheduled. The
|
||||||
|
// minimum set selected is subject to the constraint that a higher-priority pod
|
||||||
|
// is never preempted when a lower-priority pod could be (higher/lower relative
|
||||||
|
// to one another, not relative to the preemptor "pod").
|
||||||
|
// The algorithm first checks if the pod can be scheduled on the node when all the
|
||||||
|
// lower priority pods are gone. If so, it sorts all the lower priority pods by
|
||||||
|
// their priority and then puts them into two groups of those whose PodDisruptionBudget
|
||||||
|
// will be violated if preempted and other non-violating pods. Both groups are
|
||||||
|
// sorted by priority. It first tries to reprieve as many PDB violating pods as
|
||||||
|
// possible and then does them same for non-PDB-violating pods while checking
|
||||||
|
// that the "pod" can still fit on the node.
|
||||||
|
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
||||||
|
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
||||||
|
// these predicates can be satisfied by removing more pods from the node.
|
||||||
|
func selectVictimsOnNode(
|
||||||
|
ctx context.Context,
|
||||||
|
ph framework.PreemptHandle,
|
||||||
|
state *framework.CycleState,
|
||||||
|
pod *v1.Pod,
|
||||||
|
nodeInfo *framework.NodeInfo,
|
||||||
|
pdbs []*policy.PodDisruptionBudget,
|
||||||
|
) ([]*v1.Pod, int, bool) {
|
||||||
|
var potentialVictims []*v1.Pod
|
||||||
|
|
||||||
|
removePod := func(rp *v1.Pod) error {
|
||||||
|
if err := nodeInfo.RemovePod(rp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
return status.AsError()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
addPod := func(ap *v1.Pod) error {
|
||||||
|
nodeInfo.AddPod(ap)
|
||||||
|
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
return status.AsError()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// As the first step, remove all the lower priority pods from the node and
|
||||||
|
// check if the given pod can be scheduled.
|
||||||
|
podPriority := podutil.GetPodPriority(pod)
|
||||||
|
for _, p := range nodeInfo.Pods {
|
||||||
|
if podutil.GetPodPriority(p.Pod) < podPriority {
|
||||||
|
potentialVictims = append(potentialVictims, p.Pod)
|
||||||
|
if err := removePod(p.Pod); err != nil {
|
||||||
|
return nil, 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If the new pod does not fit after removing all the lower priority pods,
|
||||||
|
// we are almost done and this node is not suitable for preemption. The only
|
||||||
|
// condition that we could check is if the "pod" is failing to schedule due to
|
||||||
|
// inter-pod affinity to one or more victims, but we have decided not to
|
||||||
|
// support this case for performance reasons. Having affinity to lower
|
||||||
|
// priority pods is not a recommended configuration anyway.
|
||||||
|
if fits, _, err := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo); !fits {
|
||||||
|
if err != nil {
|
||||||
|
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, 0, false
|
||||||
|
}
|
||||||
|
var victims []*v1.Pod
|
||||||
|
numViolatingVictim := 0
|
||||||
|
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
|
||||||
|
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
|
||||||
|
// violating victims and then other non-violating ones. In both cases, we start
|
||||||
|
// from the highest priority victims.
|
||||||
|
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
|
||||||
|
reprievePod := func(p *v1.Pod) (bool, error) {
|
||||||
|
if err := addPod(p); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
fits, _, _ := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo)
|
||||||
|
if !fits {
|
||||||
|
if err := removePod(p); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
victims = append(victims, p)
|
||||||
|
klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
|
||||||
|
}
|
||||||
|
return fits, nil
|
||||||
|
}
|
||||||
|
for _, p := range violatingVictims {
|
||||||
|
if fits, err := reprievePod(p); err != nil {
|
||||||
|
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
|
||||||
|
return nil, 0, false
|
||||||
|
} else if !fits {
|
||||||
|
numViolatingVictim++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Now we try to reprieve non-violating victims.
|
||||||
|
for _, p := range nonViolatingVictims {
|
||||||
|
if _, err := reprievePod(p); err != nil {
|
||||||
|
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
|
||||||
|
return nil, 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return victims, numViolatingVictim, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||||
|
// priority of the given "pod" and are nominated to run on the given node.
|
||||||
|
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||||
|
// and return those that no longer fit, but that would require lots of
|
||||||
|
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
||||||
|
// worth the complexity, especially because we generally expect to have a very
|
||||||
|
// small number of nominated pods per node.
|
||||||
|
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||||
|
pods := pn.NominatedPodsForNode(nodeName)
|
||||||
|
|
||||||
|
if len(pods) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var lowerPriorityPods []*v1.Pod
|
||||||
|
podPriority := podutil.GetPodPriority(pod)
|
||||||
|
for _, p := range pods {
|
||||||
|
if podutil.GetPodPriority(p) < podPriority {
|
||||||
|
lowerPriorityPods = append(lowerPriorityPods, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lowerPriorityPods
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
||||||
|
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
|
||||||
|
// preempted.
|
||||||
|
// This function is stable and does not change the order of received pods. So, if it
|
||||||
|
// receives a sorted list, grouping will preserve the order of the input list.
|
||||||
|
func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
|
||||||
|
pdbsAllowed := make([]int32, len(pdbs))
|
||||||
|
for i, pdb := range pdbs {
|
||||||
|
pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, obj := range pods {
|
||||||
|
pod := obj
|
||||||
|
pdbForPodIsViolated := false
|
||||||
|
// A pod with no labels will not match any PDB. So, no need to check.
|
||||||
|
if len(pod.Labels) != 0 {
|
||||||
|
for i, pdb := range pdbs {
|
||||||
|
if pdb.Namespace != pod.Namespace {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// A PDB with a nil or empty selector matches nothing.
|
||||||
|
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Existing in DisruptedPods means it has been processed in API server,
|
||||||
|
// we don't treat it as a violating case.
|
||||||
|
if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Only decrement the matched pdb when it's not in its <DisruptedPods>;
|
||||||
|
// otherwise we may over-decrement the budget number.
|
||||||
|
pdbsAllowed[i]--
|
||||||
|
// We have found a matching PDB.
|
||||||
|
if pdbsAllowed[i] < 0 {
|
||||||
|
pdbForPodIsViolated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pdbForPodIsViolated {
|
||||||
|
violatingPods = append(violatingPods, pod)
|
||||||
|
} else {
|
||||||
|
nonViolatingPods = append(nonViolatingPods, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return violatingPods, nonViolatingPods
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
|
||||||
|
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -767,7 +767,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
return true, b, nil
|
return true, b, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
fwk, _ := st.NewFramework(fns, frameworkruntime.WithClientSet(client))
|
fwk, _ := st.NewFramework(fns, frameworkruntime.WithClientSet(client), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()))
|
||||||
prof := &profile.Profile{
|
prof := &profile.Profile{
|
||||||
Framework: fwk,
|
Framework: fwk,
|
||||||
Recorder: &events.FakeRecorder{},
|
Recorder: &events.FakeRecorder{},
|
||||||
@ -782,7 +782,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
|
|
||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
internalqueue.NewSchedulingQueue(nil),
|
|
||||||
internalcache.NewEmptySnapshot(),
|
internalcache.NewEmptySnapshot(),
|
||||||
[]framework.Extender{},
|
[]framework.Extender{},
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||||
@ -1135,7 +1134,6 @@ func TestSchedulerBinding(t *testing.T) {
|
|||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
|
||||||
test.extenders,
|
test.extenders,
|
||||||
nil,
|
nil,
|
||||||
false,
|
false,
|
||||||
|
Loading…
Reference in New Issue
Block a user