diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/BUILD b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD index e740192d8a2..7b61cab5ff8 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/BUILD +++ b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD @@ -19,6 +19,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 7e876433e19..f8aee96284f 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -31,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + policylisters "k8s.io/client-go/listers/policy/v1beta1" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" kubefeatures "k8s.io/kubernetes/pkg/features" @@ -48,7 +50,8 @@ const ( // DefaultPreemption is a PostFilter plugin implements the preemption logic. type DefaultPreemption struct { - fh framework.FrameworkHandle + fh framework.FrameworkHandle + pdbLister policylisters.PodDisruptionBudgetLister } var _ framework.PostFilterPlugin = &DefaultPreemption{} @@ -60,10 +63,9 @@ func (pl *DefaultPreemption) Name() string { // New initializes a new plugin and returns it. func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) { - pl := DefaultPreemption{fh} - if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { - // A hack to initialize pdbLister in sharedInformerFactory. - fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister() + pl := DefaultPreemption{ + fh: fh, + pdbLister: getPDBLister(fh.SharedInformerFactory()), } return &pl, nil } @@ -77,7 +79,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) }() - nnn, err := preempt(ctx, pl.fh, state, pod, m) + nnn, err := pl.preempt(ctx, state, pod, m) if err != nil { return nil, framework.NewStatus(framework.Error, err.Error()) } @@ -97,8 +99,8 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { - cs := fh.ClientSet() +func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { + cs := pl.fh.ClientSet() // TODO(Huang-Wei): get pod from informer cache instead of API server. pod, err := util.GetUpdatedPod(cs, pod) if err != nil { @@ -106,11 +108,11 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework return "", err } - if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) { + if !podEligibleToPreemptOthers(pod, pl.fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return "", nil } - allNodes, err := fh.SnapshotSharedLister().NodeInfos().List() + allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List() if err != nil { return "", err } @@ -134,11 +136,11 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework } klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample) } - pdbs, err := getPodDisruptionBudgets(fh) + pdbs, err := getPodDisruptionBudgets(pl.pdbLister) if err != nil { return "", err } - nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), state, pod, potentialNodes, pdbs) + nodeNameToVictims, err := selectNodesForPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs) if err != nil { return "", err } @@ -146,7 +148,7 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework // We will only check nodeNameToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. - nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims) + nodeNameToVictims, err = processPreemptionWithExtenders(pl.fh, pod, nodeNameToVictims) if err != nil { return "", err } @@ -163,10 +165,10 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin - if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { + if waitingPod := pl.fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } - fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) + pl.fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) } metrics.PreemptionVictims.Observe(float64(len(victims))) @@ -174,7 +176,7 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. - nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode) + nominatedPods := getLowerPriorityNominatedPods(pl.fh.PreemptHandle(), pod, candidateNode) if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err) // We do not return as this error is not critical. @@ -615,9 +617,16 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg return violatingPods, nonViolatingPods } -func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) { +func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { - return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything()) + return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister() + } + return nil +} + +func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { + if pdbLister != nil { + return pdbLister.List(labels.Everything()) } return nil, nil } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index a3407fcc6da..5050a754585 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -221,7 +221,6 @@ func TestPostFilter(t *testing.T) { if err != nil { t.Fatal(err) } - p := DefaultPreemption{fh: f} state := framework.NewCycleState() // Ensure is populated. @@ -229,6 +228,10 @@ func TestPostFilter(t *testing.T) { t.Errorf("Unexpected PreFilter Status: %v", status) } + p := DefaultPreemption{ + fh: f, + pdbLister: getPDBLister(informerFactory), + } gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses) if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("Status does not match: %v, want: %v", gotStatus, tt.wantStatus) @@ -1236,6 +1239,7 @@ func TestPreempt(t *testing.T) { extenders = append(extenders, extender) } + informerFactory := informers.NewSharedInformerFactory(client, 0) fwk, err := st.NewFramework( []st.RegisterPluginFunc{ test.registerPlugin, @@ -1247,7 +1251,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithExtenders(extenders), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), - frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)), + frameworkruntime.WithInformerFactory(informerFactory), ) if err != nil { t.Fatal(err) @@ -1260,7 +1264,11 @@ func TestPreempt(t *testing.T) { t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus) } // Call preempt and check the expected results. - node, err := preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap)) + pl := DefaultPreemption{ + fh: fwk, + pdbLister: getPDBLister(informerFactory), + } + node, err := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -1298,7 +1306,7 @@ func TestPreempt(t *testing.T) { } // Call preempt again and make sure it doesn't preempt any more pods. - node, err = preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap)) + node, err = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) if err != nil { t.Errorf("unexpected error in preemption: %v", err) }