From 59eff29d22588995a7733dc244df76ff20d488d4 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 23 Apr 2020 17:08:59 -0700 Subject: [PATCH] - Add Extenders() and PluginsRunner interface to PreemptHandle - Make some private functions stateless - make addNominatedPods() not dependent on genericScheduler - make addNominatedPods() not dependent on genericScheduler - make selectVictimsOnNode() not dependent on genericScheduler - make selectNodesForPreemption() not dependent on genericScheduler --- pkg/scheduler/core/generic_scheduler.go | 41 ++++++++++--------- pkg/scheduler/core/generic_scheduler_test.go | 4 +- pkg/scheduler/framework/v1alpha1/framework.go | 28 ++++++++++++- pkg/scheduler/framework/v1alpha1/interface.go | 17 ++++++++ pkg/scheduler/scheduler_test.go | 3 +- 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index d84ac0374b5..e31993ed8f6 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -280,7 +280,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s return "", nil, nil, err } } - nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) + nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs) if err != nil { return "", nil, nil, err } @@ -442,7 +442,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] - fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) + fits, status, err := podPassesFiltersOnNode(ctx, prof, g.podNominator, state, pod, nodeInfo) if err != nil { errCh.SendErrorWithCancel(err, cancel) return @@ -520,12 +520,12 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // 3) augmented nodeInfo. -func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { - if g.podNominator == nil || nodeInfo == nil || nodeInfo.Node() == nil { +func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator framework.PodNominator, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { + if nominator == nil || nodeInfo == nil || nodeInfo.Node() == nil { // This may happen only in tests. return false, state, nodeInfo, nil } - nominatedPods := g.podNominator.NominatedPodsForNode(nodeInfo.Node().Name) + nominatedPods := nominator.NominatedPodsForNode(nodeInfo.Node().Name) if len(nominatedPods) == 0 { return false, state, nodeInfo, nil } @@ -535,7 +535,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P for _, p := range nominatedPods { if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID { nodeInfoOut.AddPod(p) - status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) + status := pr.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) if !status.IsSuccess() { return false, state, nodeInfo, status.AsError() } @@ -555,9 +555,10 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P // and add the nominated pods. Removal of the victims is done by // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and // NodeInfo before calling this function. -func (g *genericScheduler) podPassesFiltersOnNode( +func podPassesFiltersOnNode( ctx context.Context, - prof *profile.Profile, + pr framework.PluginsRunner, + nominator framework.PodNominator, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo, @@ -588,7 +589,7 @@ func (g *genericScheduler) podPassesFiltersOnNode( nodeInfoToUse := info if i == 0 { var err error - podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info) + podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, pr, nominator, pod, state, info) if err != nil { return false, nil, err } @@ -596,7 +597,7 @@ func (g *genericScheduler) podPassesFiltersOnNode( break } - statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) + statusMap := pr.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return false, status, status.AsError() @@ -847,9 +848,10 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str // selectNodesForPreemption finds all the nodes with possible victims for // preemption in parallel. -func (g *genericScheduler) selectNodesForPreemption( +func selectNodesForPreemption( ctx context.Context, - prof *profile.Profile, + pr framework.PluginsRunner, + nominator framework.PodNominator, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo, @@ -861,7 +863,7 @@ func (g *genericScheduler) selectNodesForPreemption( checkNode := func(i int) { nodeInfoCopy := potentialNodes[i].Clone() stateCopy := state.Clone() - pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs) + pods, numPDBViolations, fits := selectVictimsOnNode(ctx, pr, nominator, stateCopy, pod, nodeInfoCopy, pdbs) if fits { resultLock.Lock() victims := extenderv1.Victims{ @@ -937,9 +939,10 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg // NOTE: This function assumes that it is never called if "pod" cannot be scheduled // due to pod affinity, node affinity, or node anti-affinity reasons. None of // these predicates can be satisfied by removing more pods from the node. -func (g *genericScheduler) selectVictimsOnNode( +func selectVictimsOnNode( ctx context.Context, - prof *profile.Profile, + pr framework.PluginsRunner, + nominator framework.PodNominator, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, @@ -951,7 +954,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := nodeInfo.RemovePod(rp); err != nil { return err } - status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) + status := pr.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -959,7 +962,7 @@ func (g *genericScheduler) selectVictimsOnNode( } addPod := func(ap *v1.Pod) error { nodeInfo.AddPod(ap) - status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) + status := pr.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -982,7 +985,7 @@ func (g *genericScheduler) selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits { + if fits, _, err := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1000,7 +1003,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) + fits, _, _ := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo) if !fits { if err := removePod(p); err != nil { return false, err diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 1ab6f7bac99..ecd9950a7cb 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1632,7 +1632,7 @@ func TestSelectNodesForPreemption(t *testing.T) { if err != nil { t.Fatal(err) } - nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs) + nodeToPods, err := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, test.pdbs) if err != nil { t.Error(err) } @@ -1912,7 +1912,7 @@ func TestPickOneNodeForPreemption(t *testing.T) { if !preFilterStatus.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus) } - candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil) + candidateNodes, _ := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, nil) node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index f8f0559ca1d..80b0ffef67f 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -121,6 +121,7 @@ type frameworkOptions struct { snapshotSharedLister SharedLister metricsRecorder *metricsRecorder podNominator PodNominator + extenders []Extender runAllFilters bool } @@ -170,10 +171,31 @@ func WithPodNominator(nominator PodNominator) Option { } } +// WithExtenders sets extenders for the scheduling framework. +func WithExtenders(extenders []Extender) Option { + return func(o *frameworkOptions) { + o.extenders = extenders + } +} + var defaultFrameworkOptions = frameworkOptions{ metricsRecorder: newMetricsRecorder(1000, time.Second), } +// TODO(#91029): move this to framework runtime package. +var _ PreemptHandle = &preemptHandle{} + +type preemptHandle struct { + extenders []Extender + PodNominator + PluginsRunner +} + +// Extenders returns the registered extenders. +func (ph *preemptHandle) Extenders() []Extender { + return ph.extenders +} + var _ Framework = &framework{} // NewFramework initializes plugins given the configuration and the registry. @@ -191,9 +213,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi clientSet: options.clientSet, informerFactory: options.informerFactory, metricsRecorder: options.metricsRecorder, - preemptHandle: options.podNominator, runAllFilters: options.runAllFilters, } + f.preemptHandle = &preemptHandle{ + extenders: options.extenders, + PodNominator: options.podNominator, + PluginsRunner: f, + } if plugins == nil { return f, nil } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 55c434338d8..a81086cf9c4 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -494,7 +494,12 @@ type FrameworkHandle interface { // PreemptHandle incorporates all needed logic to run preemption logic. type PreemptHandle interface { + // PodNominator abstracts operations to maintain nominated Pods. PodNominator + // PluginsRunner abstracts operations to run some plugins. + PluginsRunner + // Extenders returns registered scheduler extenders. + Extenders() []Extender } // PodNominator abstracts operations to maintain nominated Pods. @@ -509,3 +514,15 @@ type PodNominator interface { // NominatedPodsForNode returns nominatedPods on the given node. NominatedPodsForNode(nodeName string) []*v1.Pod } + +// PluginsRunner abstracts operations to run some plugins. +// This is used by preemption PostFilter plugins when evaluating the feasibility of +// scheduling the pod on nodes when certain running pods get evicted. +type PluginsRunner interface { + // RunFilterPlugins runs the set of configured filter plugins for pod on the given node. + RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus + // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured PreFilter plugins. + RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status + // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured PreFilter plugins. + RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 015e8152d7c..545038c6e09 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -145,7 +145,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, func (es mockScheduler) Extenders() []framework.Extender { return nil } -func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { + +func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { return "", nil, nil, nil }