Merge pull request #90711 from Huang-Wei/prefactor-preemption-stateless-funcs

Add Extender and PluginsRunner to PreemptHandle
This commit is contained in:
Kubernetes Prow Robot 2020-05-26 13:37:23 -07:00 committed by GitHub
commit 31b699607f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 23 deletions

View File

@ -280,7 +280,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
return "", nil, nil, err
}
}
nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
if err != nil {
return "", nil, nil, err
}
@ -442,7 +442,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
fits, status, err := podPassesFiltersOnNode(ctx, prof, g.podNominator, state, pod, nodeInfo)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
@ -520,12 +520,12 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo.
func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
if g.podNominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator framework.PodNominator, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
if nominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, state, nodeInfo, nil
}
nominatedPods := g.podNominator.NominatedPodsForNode(nodeInfo.Node().Name)
nominatedPods := nominator.NominatedPodsForNode(nodeInfo.Node().Name)
if len(nominatedPods) == 0 {
return false, state, nodeInfo, nil
}
@ -535,7 +535,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
for _, p := range nominatedPods {
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
status := pr.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
if !status.IsSuccess() {
return false, state, nodeInfo, status.AsError()
}
@ -555,9 +555,10 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
// and add the nominated pods. Removal of the victims is done by
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
// NodeInfo before calling this function.
func (g *genericScheduler) podPassesFiltersOnNode(
func podPassesFiltersOnNode(
ctx context.Context,
prof *profile.Profile,
pr framework.PluginsRunner,
nominator framework.PodNominator,
state *framework.CycleState,
pod *v1.Pod,
info *framework.NodeInfo,
@ -588,7 +589,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
nodeInfoToUse := info
if i == 0 {
var err error
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info)
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, pr, nominator, pod, state, info)
if err != nil {
return false, nil, err
}
@ -596,7 +597,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
break
}
statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
statusMap := pr.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() {
return false, status, status.AsError()
@ -847,9 +848,10 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func (g *genericScheduler) selectNodesForPreemption(
func selectNodesForPreemption(
ctx context.Context,
prof *profile.Profile,
pr framework.PluginsRunner,
nominator framework.PodNominator,
state *framework.CycleState,
pod *v1.Pod,
potentialNodes []*framework.NodeInfo,
@ -861,7 +863,7 @@ func (g *genericScheduler) selectNodesForPreemption(
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[i].Clone()
stateCopy := state.Clone()
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, pr, nominator, stateCopy, pod, nodeInfoCopy, pdbs)
if fits {
resultLock.Lock()
victims := extenderv1.Victims{
@ -937,9 +939,10 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
func (g *genericScheduler) selectVictimsOnNode(
func selectVictimsOnNode(
ctx context.Context,
prof *profile.Profile,
pr framework.PluginsRunner,
nominator framework.PodNominator,
state *framework.CycleState,
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
@ -951,7 +954,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := nodeInfo.RemovePod(rp); err != nil {
return err
}
status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
status := pr.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
@ -959,7 +962,7 @@ func (g *genericScheduler) selectVictimsOnNode(
}
addPod := func(ap *v1.Pod) error {
nodeInfo.AddPod(ap)
status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
status := pr.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
@ -982,7 +985,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits {
if fits, _, err := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -1000,7 +1003,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := addPod(p); err != nil {
return false, err
}
fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
fits, _, _ := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo)
if !fits {
if err := removePod(p); err != nil {
return false, err

View File

@ -1632,7 +1632,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs)
nodeToPods, err := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, test.pdbs)
if err != nil {
t.Error(err)
}
@ -1912,7 +1912,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
if !preFilterStatus.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
}
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil)
candidateNodes, _ := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {

View File

@ -121,6 +121,7 @@ type frameworkOptions struct {
snapshotSharedLister SharedLister
metricsRecorder *metricsRecorder
podNominator PodNominator
extenders []Extender
runAllFilters bool
}
@ -170,10 +171,31 @@ func WithPodNominator(nominator PodNominator) Option {
}
}
// WithExtenders sets extenders for the scheduling framework.
func WithExtenders(extenders []Extender) Option {
return func(o *frameworkOptions) {
o.extenders = extenders
}
}
var defaultFrameworkOptions = frameworkOptions{
metricsRecorder: newMetricsRecorder(1000, time.Second),
}
// TODO(#91029): move this to framework runtime package.
var _ PreemptHandle = &preemptHandle{}
type preemptHandle struct {
extenders []Extender
PodNominator
PluginsRunner
}
// Extenders returns the registered extenders.
func (ph *preemptHandle) Extenders() []Extender {
return ph.extenders
}
var _ Framework = &framework{}
// NewFramework initializes plugins given the configuration and the registry.
@ -191,9 +213,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
clientSet: options.clientSet,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
preemptHandle: options.podNominator,
runAllFilters: options.runAllFilters,
}
f.preemptHandle = &preemptHandle{
extenders: options.extenders,
PodNominator: options.podNominator,
PluginsRunner: f,
}
if plugins == nil {
return f, nil
}

View File

@ -494,7 +494,12 @@ type FrameworkHandle interface {
// PreemptHandle incorporates all needed logic to run preemption logic.
type PreemptHandle interface {
// PodNominator abstracts operations to maintain nominated Pods.
PodNominator
// PluginsRunner abstracts operations to run some plugins.
PluginsRunner
// Extenders returns registered scheduler extenders.
Extenders() []Extender
}
// PodNominator abstracts operations to maintain nominated Pods.
@ -509,3 +514,15 @@ type PodNominator interface {
// NominatedPodsForNode returns nominatedPods on the given node.
NominatedPodsForNode(nodeName string) []*v1.Pod
}
// PluginsRunner abstracts operations to run some plugins.
// This is used by preemption PostFilter plugins when evaluating the feasibility of
// scheduling the pod on nodes when certain running pods get evicted.
type PluginsRunner interface {
// RunFilterPlugins runs the set of configured filter plugins for pod on the given node.
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured PreFilter plugins.
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured PreFilter plugins.
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status
}

View File

@ -145,7 +145,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile,
func (es mockScheduler) Extenders() []framework.Extender {
return nil
}
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
return "", nil, nil, nil
}