Merge pull request #99119 from gavinfish/sched-preempt

Scheduler: let default preemption returns status instead of err
This commit is contained in:
Kubernetes Prow Robot 2021-02-19 20:49:41 -08:00 committed by GitHub
commit 0b5dd00d03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 33 deletions

View File

@ -93,12 +93,9 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
metrics.PreemptionAttempts.Inc() metrics.PreemptionAttempts.Inc()
}() }()
nnn, err := pl.preempt(ctx, state, pod, m) nnn, status := pl.preempt(ctx, state, pod, m)
if err != nil { if !status.IsSuccess() {
if _, ok := err.(*framework.FitError); ok { return nil, status
return nil, framework.NewStatus(framework.Unschedulable, err.Error())
}
return nil, framework.AsStatus(err)
} }
// This happens when the pod is not eligible for preemption or extenders filtered all candidates. // This happens when the pod is not eligible for preemption or extenders filtered all candidates.
if nnn == "" { if nnn == "" {
@ -117,7 +114,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
// other pods with the same priority. The nominated pod prevents other pods from // other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time // using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods. // before it is retried after many other pending pods.
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
cs := pl.fh.ClientSet() cs := pl.fh.ClientSet()
ph := pl.fh.PreemptHandle() ph := pl.fh.PreemptHandle()
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos() nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
@ -130,7 +127,7 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name) pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil { if err != nil {
klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName)) klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
return "", err return "", framework.AsStatus(err)
} }
// 1) Ensure the preemptor is eligible to preempt other pods. // 1) Ensure the preemptor is eligible to preempt other pods.
@ -140,27 +137,28 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
} }
// 2) Find all preemption candidates. // 2) Find all preemption candidates.
candidates, nodeToStautsMap, err := pl.FindCandidates(ctx, state, pod, m) candidates, nodeToStatusMap, status := pl.FindCandidates(ctx, state, pod, m)
if err != nil { if !status.IsSuccess() {
return "", err return "", status
} }
// Return a FitError only when there are no candidates that fit the pod. // Return a FitError only when there are no candidates that fit the pod.
if len(candidates) == 0 { if len(candidates) == 0 {
return "", &framework.FitError{ fitError := &framework.FitError{
Pod: pod, Pod: pod,
NumAllNodes: len(nodeToStautsMap), NumAllNodes: len(nodeToStatusMap),
Diagnosis: framework.Diagnosis{ Diagnosis: framework.Diagnosis{
NodeToStatusMap: nodeToStautsMap, NodeToStatusMap: nodeToStatusMap,
// Leave FailedPlugins as nil as it won't be used on moving Pods. // Leave FailedPlugins as nil as it won't be used on moving Pods.
}, },
} }
return "", framework.NewStatus(framework.Unschedulable, fitError.Error())
} }
// 3) Interact with registered Extenders to filter out some candidates if needed. // 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates) candidates, status = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
if err != nil { if !status.IsSuccess() {
return "", err return "", status
} }
// 4) Find the best candidate. // 4) Find the best candidate.
@ -170,8 +168,8 @@ func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.Cycle
} }
// 5) Perform preparation work before nominating the selected candidate. // 5) Perform preparation work before nominating the selected candidate.
if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); err != nil { if status := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); !status.IsSuccess() {
return "", err return "", status
} }
return bestCandidate.Name(), nil return bestCandidate.Name(), nil
@ -200,13 +198,13 @@ func (pl *DefaultPreemption) getOffsetAndNumCandidates(numNodes int32) (int32, i
// FindCandidates calculates a slice of preemption candidates. // FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable. // Each candidate is executable to make the given <pod> schedulable.
func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, error) { func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List() allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, framework.AsStatus(err)
} }
if len(allNodes) == 0 { if len(allNodes) == 0 {
return nil, nil, fmt.Errorf("no nodes available") return nil, nil, framework.NewStatus(framework.Error, "no nodes available")
} }
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
if len(potentialNodes) == 0 { if len(potentialNodes) == 0 {
@ -221,7 +219,7 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor
pdbs, err := getPodDisruptionBudgets(pl.pdbLister) pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, framework.AsStatus(err)
} }
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes))) offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))
@ -372,7 +370,7 @@ func dryRunPreemption(ctx context.Context, fh framework.Handle,
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister, func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister,
candidates []Candidate) ([]Candidate, error) { candidates []Candidate) ([]Candidate, *framework.Status) {
if len(extenders) == 0 { if len(extenders) == 0 {
return candidates, nil return candidates, nil
} }
@ -394,7 +392,7 @@ func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister frame
"extender", extender, "err", err) "extender", extender, "err", err)
continue continue
} }
return nil, err return nil, framework.AsStatus(err)
} }
// Replace victimsMap with new result after preemption. So the // Replace victimsMap with new result after preemption. So the
// rest of extenders can continue use it as parameter. // rest of extenders can continue use it as parameter.
@ -697,11 +695,11 @@ func selectVictimsOnNode(
// - Evict the victim pods // - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map // - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed // - Clear the low-priority pods' nominatedNodeName status if needed
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) error { func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status {
for _, victim := range c.Victims().Pods { for _, victim := range c.Victims().Pods {
if err := util.DeletePod(cs, victim); err != nil { if err := util.DeletePod(cs, victim); err != nil {
klog.ErrorS(err, "preempting pod", "pod", klog.KObj(victim)) klog.ErrorS(err, "preempting pod", "pod", klog.KObj(victim))
return err return framework.AsStatus(err)
} }
// If the victim is a WaitingPod, send a reject message to the PermitPlugin // If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {

View File

@ -1666,9 +1666,9 @@ func TestPreempt(t *testing.T) {
pdbLister: getPDBLister(informerFactory), pdbLister: getPDBLister(informerFactory),
args: *getDefaultDefaultPreemptionArgs(), args: *getDefaultDefaultPreemptionArgs(),
} }
node, err := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) node, status := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
if err != nil { if !status.IsSuccess() {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", status.AsError())
} }
if len(node) != 0 && node != test.expectedNode { if len(node) != 0 && node != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, node) t.Errorf("expected node: %v, got: %v", test.expectedNode, node)
@ -1703,9 +1703,9 @@ func TestPreempt(t *testing.T) {
} }
// Call preempt again and make sure it doesn't preempt any more pods. // Call preempt again and make sure it doesn't preempt any more pods.
node, err = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap)) node, status = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
if err != nil { if !status.IsSuccess() {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", status.AsError())
} }
if len(node) != 0 && len(deletedPodNames) > 0 { if len(node) != 0 && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node) t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)