Introduce a defaultpreemption PostFilter plugin

- Add a defaultpreemption PostFilter plugin
- Make g.Preempt() stateless
    - make g.Preempt() stateless
    - make g.getLowerPriorityNominatedPods() stateless
    - make g.processPreemptionWithExtenders() stateless
This commit is contained in:
Wei Huang 2020-06-17 14:15:14 -07:00
parent 1faf097f3f
commit 196056d7fe
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
12 changed files with 134 additions and 80 deletions

View File

@ -42,7 +42,6 @@ go_library(
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library",

View File

@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
@ -23,8 +24,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library",

View File

@ -283,7 +283,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
emptySnapshot, emptySnapshot,
extenders, extenders,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
schedulerapi.DefaultPercentageOfNodesToScore) schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{} podIgnored := &v1.Pod{}

View File

@ -33,10 +33,11 @@ import (
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubefeatures "k8s.io/kubernetes/pkg/features"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
@ -100,10 +101,6 @@ func (f *FitError) Error() string {
// TODO: Rename this type. // TODO: Rename this type.
type ScheduleAlgorithm interface { type ScheduleAlgorithm interface {
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, and error if any.
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error)
// Extenders returns a slice of extender config. This is exposed for // Extenders returns a slice of extender config. This is exposed for
// testing. // testing.
Extenders() []framework.Extender Extenders() []framework.Extender
@ -126,7 +123,6 @@ type genericScheduler struct {
extenders []framework.Extender extenders []framework.Extender
nodeInfoSnapshot *internalcache.Snapshot nodeInfoSnapshot *internalcache.Snapshot
pvcLister corelisters.PersistentVolumeClaimLister pvcLister corelisters.PersistentVolumeClaimLister
pdbLister policylisters.PodDisruptionBudgetLister
disablePreemption bool disablePreemption bool
percentageOfNodesToScore int32 percentageOfNodesToScore int32
nextStartNodeIndex int nextStartNodeIndex int
@ -236,7 +232,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
return selected, nil return selected, nil
} }
// preempt finds nodes with pods that can be preempted to make room for "pod" to // Preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and // schedule. It chooses one of the nodes and preempts the pods on the node and
// returns 1) the node, 2) the list of preempted pods if such a node is found, // returns 1) the node, 2) the list of preempted pods if such a node is found,
// 3) A list of pods whose nominated node name should be cleared, and 4) any // 3) A list of pods whose nominated node name should be cleared, and 4) any
@ -248,8 +244,8 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// other pods with the same priority. The nominated pod prevents other pods from // other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time // using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods. // before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { func Preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
cs := prof.ClientSet() cs := fh.ClientSet()
// TODO(Huang-Wei): get pod from informer cache instead of API server. // TODO(Huang-Wei): get pod from informer cache instead of API server.
pod, err := util.GetUpdatedPod(cs, pod) pod, err := util.GetUpdatedPod(cs, pod)
if err != nil { if err != nil {
@ -257,11 +253,11 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
return "", err return "", err
} }
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) { if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos()) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return "", nil return "", nil
} }
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
if err != nil { if err != nil {
return "", err return "", err
} }
@ -285,14 +281,11 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
} }
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample) klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
} }
var pdbs []*policy.PodDisruptionBudget pdbs, err := getPodDisruptionBudgets(fh)
if g.pdbLister != nil { if err != nil {
pdbs, err = g.pdbLister.List(labels.Everything()) return "", err
if err != nil {
return "", err
}
} }
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs) nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -300,7 +293,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
// We will only check nodeNameToVictims with extenders that support preemption. // We will only check nodeNameToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims) nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -317,10 +310,10 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
return "", err return "", err
} }
// If the victim is a WaitingPod, send a reject message to the PermitPlugin // If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil { if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted") waitingPod.Reject("preempted")
} }
prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
} }
metrics.PreemptionVictims.Observe(float64(len(victims))) metrics.PreemptionVictims.Observe(float64(len(victims)))
@ -328,7 +321,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
// this node. So, we should remove their nomination. Removing their // this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It // nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode) nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err) klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
// We do not return as this error is not critical. // We do not return as this error is not critical.
@ -337,18 +330,22 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
return candidateNode, nil return candidateNode, nil
} }
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
}
return nil, nil
}
// processPreemptionWithExtenders processes preemption with extenders // processPreemptionWithExtenders processes preemption with extenders
func (g *genericScheduler) processPreemptionWithExtenders( func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) {
pod *v1.Pod,
nodeNameToVictims map[string]*extenderv1.Victims,
) (map[string]*extenderv1.Victims, error) {
if len(nodeNameToVictims) > 0 { if len(nodeNameToVictims) > 0 {
for _, extender := range g.extenders { for _, extender := range fh.PreemptHandle().Extenders() {
if extender.SupportsPreemption() && extender.IsInterested(pod) { if extender.SupportsPreemption() && extender.IsInterested(pod) {
newNodeNameToVictims, err := extender.ProcessPreemption( newNodeNameToVictims, err := extender.ProcessPreemption(
pod, pod,
nodeNameToVictims, nodeNameToVictims,
g.nodeInfoSnapshot.NodeInfos(), fh.SnapshotSharedLister().NodeInfos(),
) )
if err != nil { if err != nil {
if extender.IsIgnorable() { if extender.IsIgnorable() {
@ -381,8 +378,8 @@ func (g *genericScheduler) processPreemptionWithExtenders(
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be // manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very // worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node. // small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.podNominator.NominatedPodsForNode(nodeName) pods := pn.NominatedPodsForNode(nodeName)
if len(pods) == 0 { if len(pods) == 0 {
return nil return nil
@ -1141,7 +1138,6 @@ func NewGenericScheduler(
nodeInfoSnapshot *internalcache.Snapshot, nodeInfoSnapshot *internalcache.Snapshot,
extenders []framework.Extender, extenders []framework.Extender,
pvcLister corelisters.PersistentVolumeClaimLister, pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister policylisters.PodDisruptionBudgetLister,
disablePreemption bool, disablePreemption bool,
percentageOfNodesToScore int32) ScheduleAlgorithm { percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
@ -1150,7 +1146,6 @@ func NewGenericScheduler(
extenders: extenders, extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot, nodeInfoSnapshot: nodeInfoSnapshot,
pvcLister: pvcLister, pvcLister: pvcLister,
pdbLister: pdbLister,
disablePreemption: disablePreemption, disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore, percentageOfNodesToScore: percentageOfNodesToScore,
} }

View File

@ -693,9 +693,6 @@ func TestGenericScheduler(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
cache := internalcache.New(time.Duration(0), wait.NeverStop) cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods { for _, pod := range test.pods {
cache.AddPod(pod) cache.AddPod(pod)
@ -724,7 +721,6 @@ func TestGenericScheduler(t *testing.T) {
snapshot, snapshot,
[]framework.Extender{}, []framework.Extender{},
pvcLister, pvcLister,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
schedulerapi.DefaultPercentageOfNodesToScore) schedulerapi.DefaultPercentageOfNodesToScore)
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
@ -752,7 +748,7 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler {
cache, cache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
emptySnapshot, emptySnapshot,
nil, nil, nil, false, nil, nil, false,
schedulerapi.DefaultPercentageOfNodesToScore) schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler) return s.(*genericScheduler)
@ -1045,7 +1041,6 @@ func TestZeroRequest(t *testing.T) {
emptySnapshot, emptySnapshot,
[]framework.Extender{}, []framework.Extender{},
nil, nil,
nil,
false, false,
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot scheduler.nodeInfoSnapshot = snapshot
@ -1508,9 +1503,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
labelKeys := []string{"hostname", "zone", "region"} labelKeys := []string{"hostname", "zone", "region"}
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
filterFailedNodeReturnCodeMap := map[string]framework.Code{} filterFailedNodeReturnCodeMap := map[string]framework.Code{}
cache := internalcache.New(time.Duration(0), wait.NeverStop) cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods { for _, pod := range test.pods {
@ -1558,7 +1550,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
snapshot, snapshot,
[]framework.Extender{}, []framework.Extender{},
nil, nil,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
schedulerapi.DefaultPercentageOfNodesToScore) schedulerapi.DefaultPercentageOfNodesToScore)
g := scheduler.(*genericScheduler) g := scheduler.(*genericScheduler)
@ -2306,7 +2297,6 @@ func TestPreempt(t *testing.T) {
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
return true, nil, nil return true, nil, nil
}) })
informerFactory := informers.NewSharedInformerFactory(client, 0)
stop := make(chan struct{}) stop := make(chan struct{})
cache := internalcache.New(time.Duration(0), stop) cache := internalcache.New(time.Duration(0), stop)
@ -2344,29 +2334,21 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender) extenders = append(extenders, extender)
} }
podNominator := internalqueue.NewPodNominator()
snapshot := internalcache.NewSnapshot(test.pods, nodes) snapshot := internalcache.NewSnapshot(test.pods, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
test.registerPlugins, test.registerPlugins,
framework.WithClientSet(client), framework.WithClientSet(client),
framework.WithEventRecorder(&events.FakeRecorder{}),
framework.WithExtenders(extenders),
framework.WithPodNominator(podNominator),
framework.WithSnapshotSharedLister(snapshot), framework.WithSnapshotSharedLister(snapshot),
framework.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
prof := &profile.Profile{
Framework: fwk,
Recorder: &events.FakeRecorder{},
}
scheduler := NewGenericScheduler(
cache,
internalqueue.NewSchedulingQueue(nil),
snapshot,
extenders,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
schedulerapi.DefaultPercentageOfNodesToScore)
state := framework.NewCycleState() state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState. // Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
@ -2378,7 +2360,7 @@ func TestPreempt(t *testing.T) {
if test.failedNodeToStatusMap != nil { if test.failedNodeToStatusMap != nil {
failedNodeToStatusMap = test.failedNodeToStatusMap failedNodeToStatusMap = test.failedNodeToStatusMap
} }
node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) node, err := Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap)
if err != nil { if err != nil {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", err)
} }
@ -2416,7 +2398,7 @@ func TestPreempt(t *testing.T) {
} }
// Call preempt again and make sure it doesn't preempt any more pods. // Call preempt again and make sure it doesn't preempt any more pods.
node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) node, err = Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap)
if err != nil { if err != nil {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", err)
} }

View File

@ -30,6 +30,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
) )
@ -466,6 +467,11 @@ func addAllEventHandlers(
AddFunc: sched.onStorageClassAdd, AddFunc: sched.onStorageClassAdd,
}, },
) )
// TODO(Huang-Wei): remove this hack when defaultpreemption plugin is enabled.
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
}
} }
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string { func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string {

View File

@ -30,15 +30,12 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
@ -192,7 +189,6 @@ func (c *Configurator) create() (*Scheduler, error) {
c.nodeInfoSnapshot, c.nodeInfoSnapshot,
extenders, extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
GetPodDisruptionBudgetLister(c.informerFactory),
c.disablePreemption, c.disablePreemption,
c.percentageOfNodesToScore, c.percentageOfNodesToScore,
) )
@ -478,11 +474,3 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
} }
} }
} }
// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled.
func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
}
return nil
}

View File

@ -48,6 +48,7 @@ filegroup(
":package-srcs", ":package-srcs",
"//pkg/scheduler/framework/plugins/defaultbinder:all-srcs", "//pkg/scheduler/framework/plugins/defaultbinder:all-srcs",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:all-srcs", "//pkg/scheduler/framework/plugins/defaultpodtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/defaultpreemption:all-srcs",
"//pkg/scheduler/framework/plugins/examples:all-srcs", "//pkg/scheduler/framework/plugins/examples:all-srcs",
"//pkg/scheduler/framework/plugins/helper:all-srcs", "//pkg/scheduler/framework/plugins/helper:all-srcs",
"//pkg/scheduler/framework/plugins/imagelocality:all-srcs", "//pkg/scheduler/framework/plugins/imagelocality:all-srcs",

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["default_preemption.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,61 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaultpreemption
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/core"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
const (
// Name of the plugin used in the plugin registry and configurations.
Name = "DefaultPreemption"
)
// DefaultPreemption is a PostFilter plugin implements the preemption logic.
type DefaultPreemption struct {
fh framework.FrameworkHandle
}
var _ framework.PostFilterPlugin = &DefaultPreemption{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *DefaultPreemption) Name() string {
return Name
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
pl := DefaultPreemption{fh}
return &pl, nil
}
// PostFilter invoked at the postFilter extension point.
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
nnn, err := core.Preempt(ctx, pl.fh, state, pod, m)
if err != nil {
return nil, framework.NewStatus(framework.Error, err.Error())
}
if nnn == "" {
return nil, framework.NewStatus(framework.Unschedulable)
}
return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
}

View File

@ -486,7 +486,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} else { } else {
preemptionStartTime := time.Now() preemptionStartTime := time.Now()
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin. // TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
nominatedNode, _ = sched.Algorithm.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses) nominatedNode, _ = core.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses)
metrics.PreemptionAttempts.Inc() metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))

View File

@ -120,10 +120,6 @@ func (es mockScheduler) Extenders() []framework.Extender {
return nil return nil
} }
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
return "", nil
}
func TestSchedulerCreation(t *testing.T) { func TestSchedulerCreation(t *testing.T) {
invalidRegistry := map[string]framework.PluginFactory{ invalidRegistry := map[string]framework.PluginFactory{
defaultbinder.Name: defaultbinder.New, defaultbinder.Name: defaultbinder.New,
@ -787,7 +783,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
internalcache.NewEmptySnapshot(), internalcache.NewEmptySnapshot(),
[]framework.Extender{}, []framework.Extender{},
informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
schedulerapi.DefaultPercentageOfNodesToScore, schedulerapi.DefaultPercentageOfNodesToScore,
) )
@ -1140,7 +1135,6 @@ func TestSchedulerBinding(t *testing.T) {
nil, nil,
test.extenders, test.extenders,
nil, nil,
nil,
false, false,
0, 0,
) )