Merge pull request #92876 from Huang-Wei/pdbLister

Add pdbLister as a member field of struct DefaultPreemption
This commit is contained in:
Kubernetes Prow Robot 2020-07-11 20:57:42 -07:00 committed by GitHub
commit d06ff65943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 22 deletions

View File

@ -19,6 +19,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],

View File

@ -31,6 +31,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubefeatures "k8s.io/kubernetes/pkg/features"
@ -48,7 +50,8 @@ const (
// DefaultPreemption is a PostFilter plugin implements the preemption logic.
type DefaultPreemption struct {
fh framework.FrameworkHandle
fh framework.FrameworkHandle
pdbLister policylisters.PodDisruptionBudgetLister
}
var _ framework.PostFilterPlugin = &DefaultPreemption{}
@ -60,10 +63,9 @@ func (pl *DefaultPreemption) Name() string {
// New initializes a new plugin and returns it.
func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
pl := DefaultPreemption{fh}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
// A hack to initialize pdbLister in sharedInformerFactory.
fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister()
pl := DefaultPreemption{
fh: fh,
pdbLister: getPDBLister(fh.SharedInformerFactory()),
}
return &pl, nil
}
@ -77,7 +79,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}()
nnn, err := preempt(ctx, pl.fh, state, pod, m)
nnn, err := pl.preempt(ctx, state, pod, m)
if err != nil {
return nil, framework.NewStatus(framework.Error, err.Error())
}
@ -97,8 +99,8 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
cs := fh.ClientSet()
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
cs := pl.fh.ClientSet()
// TODO(Huang-Wei): get pod from informer cache instead of API server.
pod, err := util.GetUpdatedPod(cs, pod)
if err != nil {
@ -106,11 +108,11 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
return "", err
}
if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) {
if !podEligibleToPreemptOthers(pod, pl.fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return "", nil
}
allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return "", err
}
@ -134,11 +136,11 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
}
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
}
pdbs, err := getPodDisruptionBudgets(fh)
pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
if err != nil {
return "", err
}
nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
nodeNameToVictims, err := selectNodesForPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
if err != nil {
return "", err
}
@ -146,7 +148,7 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
// We will only check nodeNameToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
nodeNameToVictims, err = processPreemptionWithExtenders(pl.fh, pod, nodeNameToVictims)
if err != nil {
return "", err
}
@ -163,10 +165,10 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
if waitingPod := pl.fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
pl.fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
}
metrics.PreemptionVictims.Observe(float64(len(victims)))
@ -174,7 +176,7 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
nominatedPods := getLowerPriorityNominatedPods(pl.fh.PreemptHandle(), pod, candidateNode)
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
// We do not return as this error is not critical.
@ -615,9 +617,16 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
return violatingPods, nonViolatingPods
}
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
}
return nil
}
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
if pdbLister != nil {
return pdbLister.List(labels.Everything())
}
return nil, nil
}

View File

@ -221,7 +221,6 @@ func TestPostFilter(t *testing.T) {
if err != nil {
t.Fatal(err)
}
p := DefaultPreemption{fh: f}
state := framework.NewCycleState()
// Ensure <state> is populated.
@ -229,6 +228,10 @@ func TestPostFilter(t *testing.T) {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
p := DefaultPreemption{
fh: f,
pdbLister: getPDBLister(informerFactory),
}
gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Status does not match: %v, want: %v", gotStatus, tt.wantStatus)
@ -1236,6 +1239,7 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
informerFactory := informers.NewSharedInformerFactory(client, 0)
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
test.registerPlugin,
@ -1247,7 +1251,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)),
frameworkruntime.WithInformerFactory(informerFactory),
)
if err != nil {
t.Fatal(err)
@ -1260,7 +1264,11 @@ func TestPreempt(t *testing.T) {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
}
// Call preempt and check the expected results.
node, err := preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap))
pl := DefaultPreemption{
fh: fwk,
pdbLister: getPDBLister(informerFactory),
}
node, err := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
@ -1298,7 +1306,7 @@ func TestPreempt(t *testing.T) {
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, err = preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap))
node, err = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}