Merge pull request #99498 from gavinfish/sched-preempthandle

Scheduler: unroll PreemptHandle to Handle
This commit is contained in:
Kubernetes Prow Robot 2021-03-01 13:58:28 -08:00 committed by GitHub
commit 0ced9d2854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 28 additions and 53 deletions

View File

@ -1173,7 +1173,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err) t.Fatal(err)
} }
fwk.PreemptHandle().AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1") fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)

View File

@ -552,6 +552,10 @@ type Framework interface {
// passed to the plugin factories at the time of plugin initialization. Plugins // passed to the plugin factories at the time of plugin initialization. Plugins
// must store and use this handle to call framework functions. // must store and use this handle to call framework functions.
type Handle interface { type Handle interface {
// PodNominator abstracts operations to maintain nominated Pods.
PodNominator
// PluginsRunner abstracts operations to run some plugins.
PluginsRunner
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until // is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Permit" point. There is no guarantee that the information // a pod finishes "Permit" point. There is no guarantee that the information
@ -581,8 +585,8 @@ type Handle interface {
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
// TODO: unroll the wrapped interfaces to Handle. // Extenders returns registered scheduler extenders.
PreemptHandle() PreemptHandle Extenders() []Extender
} }
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase. // PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
@ -590,16 +594,6 @@ type PostFilterResult struct {
NominatedNodeName string NominatedNodeName string
} }
// PreemptHandle incorporates all needed logic to run preemption logic.
type PreemptHandle interface {
// PodNominator abstracts operations to maintain nominated Pods.
PodNominator
// PluginsRunner abstracts operations to run some plugins.
PluginsRunner
// Extenders returns registered scheduler extenders.
Extenders() []Extender
}
// PodNominator abstracts operations to maintain nominated Pods. // PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface { type PodNominator interface {
// AddNominatedPod adds the given pod to the nominated pod map or // AddNominatedPod adds the given pod to the nominated pod map or

View File

@ -116,7 +116,6 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
// before it is retried after many other pending pods. // before it is retried after many other pending pods.
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) { func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
cs := pl.fh.ClientSet() cs := pl.fh.ClientSet()
ph := pl.fh.PreemptHandle()
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos() nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
// 0) Fetch the latest version of <pod>. // 0) Fetch the latest version of <pod>.
@ -156,7 +155,7 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
} }
// 3) Interact with registered Extenders to filter out some candidates if needed. // 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status = CallExtenders(ph.Extenders(), pod, nodeLister, candidates) candidates, status = CallExtenders(pl.fh.Extenders(), pod, nodeLister, candidates)
if !status.IsSuccess() { if !status.IsSuccess() {
return "", status return "", status
} }
@ -606,12 +605,11 @@ func selectVictimsOnNode(
pdbs []*policy.PodDisruptionBudget, pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, *framework.Status) { ) ([]*v1.Pod, int, *framework.Status) {
var potentialVictims []*framework.PodInfo var potentialVictims []*framework.PodInfo
ph := fh.PreemptHandle()
removePod := func(rpi *framework.PodInfo) error { removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(rpi.Pod); err != nil { if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
return err return err
} }
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo) status := fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
if !status.IsSuccess() { if !status.IsSuccess() {
return status.AsError() return status.AsError()
} }
@ -619,7 +617,7 @@ func selectVictimsOnNode(
} }
addPod := func(api *framework.PodInfo) error { addPod := func(api *framework.PodInfo) error {
nodeInfo.AddPodInfo(api) nodeInfo.AddPodInfo(api)
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo) status := fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
if !status.IsSuccess() { if !status.IsSuccess() {
return status.AsError() return status.AsError()
} }
@ -714,7 +712,7 @@ func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface,
// this node. So, we should remove their nomination. Removing their // this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It // nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name()) nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
klog.ErrorS(err, "cannot clear 'NominatedNodeName' field") klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical. // We do not return as this error is not critical.

View File

@ -88,7 +88,8 @@ type frameworkImpl struct {
metricsRecorder *metricsRecorder metricsRecorder *metricsRecorder
profileName string profileName string
preemptHandle framework.PreemptHandle extenders []framework.Extender
framework.PodNominator
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return // Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure. // after the first failure.
@ -122,6 +123,11 @@ func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionP
} }
} }
// Extenders returns the registered extenders.
func (f *frameworkImpl) Extenders() []framework.Extender {
return f.extenders
}
type frameworkOptions struct { type frameworkOptions struct {
clientSet clientset.Interface clientSet clientset.Interface
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
@ -218,19 +224,6 @@ func defaultFrameworkOptions() frameworkOptions {
} }
} }
var _ framework.PreemptHandle = &preemptHandle{}
type preemptHandle struct {
extenders []framework.Extender
framework.PodNominator
framework.PluginsRunner
}
// Extenders returns the registered extenders.
func (ph *preemptHandle) Extenders() []framework.Extender {
return ph.extenders
}
var _ framework.Framework = &frameworkImpl{} var _ framework.Framework = &frameworkImpl{}
// NewFramework initializes plugins given the configuration and the registry. // NewFramework initializes plugins given the configuration and the registry.
@ -251,11 +244,8 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
metricsRecorder: options.metricsRecorder, metricsRecorder: options.metricsRecorder,
profileName: options.profileName, profileName: options.profileName,
runAllFilters: options.runAllFilters, runAllFilters: options.runAllFilters,
}
f.preemptHandle = &preemptHandle{
extenders: options.extenders, extenders: options.extenders,
PodNominator: options.podNominator, PodNominator: options.podNominator,
PluginsRunner: f,
} }
if plugins == nil { if plugins == nil {
return f, nil return f, nil
@ -605,7 +595,6 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status { func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
var status *framework.Status var status *framework.Status
ph := f.PreemptHandle()
podsAdded := false podsAdded := false
// We run filters twice in some cases. If the node has greater or equal priority // We run filters twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
@ -630,7 +619,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
nodeInfoToUse := info nodeInfoToUse := info
if i == 0 { if i == 0 {
var err error var err error
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info) podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
if err != nil { if err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
@ -638,7 +627,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
break break
} }
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge() status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() { if !status.IsSuccess() && !status.IsUnschedulable() {
return status return status
@ -651,12 +640,12 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
// addNominatedPods adds pods with equal or greater priority which are nominated // addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo. // 3) augmented nodeInfo.
func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
if ph == nil || nodeInfo.Node() == nil { if fh == nil || nodeInfo.Node() == nil {
// This may happen only in tests. // This may happen only in tests.
return false, state, nodeInfo, nil return false, state, nodeInfo, nil
} }
nominatedPodInfos := ph.NominatedPodsForNode(nodeInfo.Node().Name) nominatedPodInfos := fh.NominatedPodsForNode(nodeInfo.Node().Name)
if len(nominatedPodInfos) == 0 { if len(nominatedPodInfos) == 0 {
return false, state, nodeInfo, nil return false, state, nodeInfo, nil
} }
@ -666,7 +655,7 @@ func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.P
for _, pi := range nominatedPodInfos { for _, pi := range nominatedPodInfos {
if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID { if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID {
nodeInfoOut.AddPodInfo(pi) nodeInfoOut.AddPodInfo(pi)
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut) status := fh.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut)
if !status.IsSuccess() { if !status.IsSuccess() {
return false, state, nodeInfo, status.AsError() return false, state, nodeInfo, status.AsError()
} }
@ -1133,11 +1122,6 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
return pgMap return pgMap
} }
// PreemptHandle returns the internal preemptHandle object.
func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle {
return f.preemptHandle
}
// ProfileName returns the profile name associated to this framework. // ProfileName returns the profile name associated to this framework.
func (f *frameworkImpl) ProfileName() string { func (f *frameworkImpl) ProfileName() string {
return f.profileName return f.profileName

View File

@ -409,15 +409,14 @@ func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.Cyc
return nil, framework.NewStatus(framework.Error, err.Error()) return nil, framework.NewStatus(framework.Error, err.Error())
} }
ph := pp.fh.PreemptHandle()
for _, nodeInfo := range nodeInfos { for _, nodeInfo := range nodeInfos {
ph.RunFilterPlugins(ctx, state, pod, nodeInfo) pp.fh.RunFilterPlugins(ctx, state, pod, nodeInfo)
} }
var nodes []*v1.Node var nodes []*v1.Node
for _, nodeInfo := range nodeInfos { for _, nodeInfo := range nodeInfos {
nodes = append(nodes, nodeInfo.Node()) nodes = append(nodes, nodeInfo.Node())
} }
ph.RunScorePlugins(ctx, state, pod, nodes) pp.fh.RunScorePlugins(ctx, state, pod, nodes)
if pp.failPostFilter { if pp.failPostFilter {
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))