From e4f878ea3a158f49be47d72df8f3169a47d4dde6 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 11 May 2020 12:52:13 -0700 Subject: [PATCH 1/2] cleanup: use string instead of v1.Node as key of nodeToVictims --- pkg/scheduler/core/extender.go | 53 ++++++--------- pkg/scheduler/core/extender_test.go | 25 +++---- pkg/scheduler/core/generic_scheduler.go | 70 ++++++++++---------- pkg/scheduler/core/generic_scheduler_test.go | 28 ++++---- pkg/scheduler/factory_test.go | 12 ++-- pkg/scheduler/framework/v1alpha1/extender.go | 70 ++++++++++++++++++++ pkg/scheduler/scheduler.go | 6 +- pkg/scheduler/scheduler_test.go | 4 +- 8 files changed, 162 insertions(+), 106 deletions(-) create mode 100644 pkg/scheduler/framework/v1alpha1/extender.go diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 7ce6c1ea509..76bb49db6f5 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -69,14 +69,13 @@ type SchedulerExtender interface { // given: // 1. Pod to schedule // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process. - // 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled. // The possible changes made by extender may include: // 1. Subset of given candidate nodes after preemption phase of extender. // 2. A different set of victim pod for every given candidate node after preemption phase of extender. ProcessPreemption( pod *v1.Pod, - nodeToVictims map[*v1.Node]*extenderv1.Victims, - nodeInfos framework.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error) + nodeToVictims map[string]*extenderv1.Victims, + nodeInfos framework.NodeInfoLister) (map[string]*extenderv1.Victims, error) // SupportsPreemption returns if the scheduler extender support preemption or not. SupportsPreemption() bool @@ -212,9 +211,9 @@ func (h *HTTPExtender) SupportsPreemption() bool { // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender. func (h *HTTPExtender) ProcessPreemption( pod *v1.Pod, - nodeToVictims map[*v1.Node]*extenderv1.Victims, + nodeNameToVictims map[string]*extenderv1.Victims, nodeInfos framework.NodeInfoLister, -) (map[*v1.Node]*extenderv1.Victims, error) { +) (map[string]*extenderv1.Victims, error) { var ( result extenderv1.ExtenderPreemptionResult args *extenderv1.ExtenderPreemptionArgs @@ -226,13 +225,12 @@ func (h *HTTPExtender) ProcessPreemption( if h.nodeCacheCapable { // If extender has cached node info, pass NodeNameToMetaVictims in args. - nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims) + nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeNameToVictims) args = &extenderv1.ExtenderPreemptionArgs{ Pod: pod, NodeNameToMetaVictims: nodeNameToMetaVictims, } } else { - nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims) args = &extenderv1.ExtenderPreemptionArgs{ Pod: pod, NodeNameToVictims: nodeNameToVictims, @@ -244,22 +242,22 @@ func (h *HTTPExtender) ProcessPreemption( } // Extender will always return NodeNameToMetaVictims. - // So let's convert it to NodeToVictims by using NodeNameToInfo. - newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeInfos) + // So let's convert it to NodeNameToVictims by using . + newNodeNameToVictims, err := h.convertToNodeNameToVictims(result.NodeNameToMetaVictims, nodeInfos) if err != nil { return nil, err } - // Do not override nodeToVictims - return newNodeToVictims, nil + // Do not override . + return newNodeNameToVictims, nil } -// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers, +// convertToNodeNameToVictims converts "nodeNameToMetaVictims" from object identifiers, // such as UIDs and names, to object pointers. -func (h *HTTPExtender) convertToNodeToVictims( +func (h *HTTPExtender) convertToNodeNameToVictims( nodeNameToMetaVictims map[string]*extenderv1.MetaVictims, nodeInfos framework.NodeInfoLister, -) (map[*v1.Node]*extenderv1.Victims, error) { - nodeToVictims := map[*v1.Node]*extenderv1.Victims{} +) (map[string]*extenderv1.Victims, error) { + nodeNameToVictims := map[string]*extenderv1.Victims{} for nodeName, metaVictims := range nodeNameToMetaVictims { nodeInfo, err := nodeInfos.Get(nodeName) if err != nil { @@ -275,9 +273,9 @@ func (h *HTTPExtender) convertToNodeToVictims( } victims.Pods = append(victims.Pods, pod) } - nodeToVictims[nodeInfo.Node()] = victims + nodeNameToVictims[nodeName] = victims } - return nodeToVictims, nil + return nodeNameToVictims, nil } // convertPodUIDToPod returns v1.Pod object for given MetaPod and node info. @@ -298,10 +296,10 @@ func (h *HTTPExtender) convertPodUIDToPod( // convertToNodeNameToMetaVictims converts from struct type to meta types. func convertToNodeNameToMetaVictims( - nodeToVictims map[*v1.Node]*extenderv1.Victims, + nodeNameToVictims map[string]*extenderv1.Victims, ) map[string]*extenderv1.MetaVictims { - nodeNameToVictims := map[string]*extenderv1.MetaVictims{} - for node, victims := range nodeToVictims { + nodeNameToMetaVictims := map[string]*extenderv1.MetaVictims{} + for node, victims := range nodeNameToVictims { metaVictims := &extenderv1.MetaVictims{ Pods: []*extenderv1.MetaPod{}, } @@ -311,20 +309,9 @@ func convertToNodeNameToMetaVictims( } metaVictims.Pods = append(metaVictims.Pods, metaPod) } - nodeNameToVictims[node.GetName()] = metaVictims + nodeNameToMetaVictims[node] = metaVictims } - return nodeNameToVictims -} - -// convertToNodeNameToVictims converts from node type to node name as key. -func convertToNodeNameToVictims( - nodeToVictims map[*v1.Node]*extenderv1.Victims, -) map[string]*extenderv1.Victims { - nodeNameToVictims := map[string]*extenderv1.Victims{} - for node, victims := range nodeToVictims { - nodeNameToVictims[node.GetName()] = victims - } - return nodeNameToVictims + return nodeNameToMetaVictims } // Filter based on extender implemented predicate functions. The filtered list is diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 378e2c5ae28..0c43b37eb42 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -159,36 +159,37 @@ func (f *FakeExtender) SupportsPreemption() bool { func (f *FakeExtender) ProcessPreemption( pod *v1.Pod, - nodeToVictims map[*v1.Node]*extenderv1.Victims, + nodeNameToVictims map[string]*extenderv1.Victims, nodeInfos framework.NodeInfoLister, -) (map[*v1.Node]*extenderv1.Victims, error) { - nodeToVictimsCopy := map[*v1.Node]*extenderv1.Victims{} - // We don't want to change the original nodeToVictims - for k, v := range nodeToVictims { +) (map[string]*extenderv1.Victims, error) { + nodeNameToVictimsCopy := map[string]*extenderv1.Victims{} + // We don't want to change the original nodeNameToVictims + for k, v := range nodeNameToVictims { // In real world implementation, extender's user should have their own way to get node object // by name if needed (e.g. query kube-apiserver etc). // // For test purpose, we just use node from parameters directly. - nodeToVictimsCopy[k] = v + nodeNameToVictimsCopy[k] = v } - for node, victims := range nodeToVictimsCopy { + for nodeName, victims := range nodeNameToVictimsCopy { // Try to do preemption on extender side. - extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node) + nodeInfo, _ := nodeInfos.Get(nodeName) + extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node()) if err != nil { return nil, err } // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, // let's remove it from potential preemption nodes. if !fits { - delete(nodeToVictimsCopy, node) + delete(nodeNameToVictimsCopy, nodeName) } else { // Append new victims to original victims - nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) - nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + int64(extendernPDBViolations) + nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...) + nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations) } } - return nodeToVictimsCopy, nil + return nodeNameToVictimsCopy, nil } // selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 6cdab280765..b5d848879e1 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -105,7 +105,7 @@ type ScheduleAlgorithm interface { // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, a // list of pods whose nominated node name should be removed, and error if any. - Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) + Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) // Extenders returns a slice of extender config. This is exposed for // testing. Extenders() []SchedulerExtender @@ -251,74 +251,74 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { +func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { - return nil, nil, nil, nil + return "", nil, nil, nil } if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) - return nil, nil, nil, nil + return "", nil, nil, nil } allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { - return nil, nil, nil, err + return "", nil, nil, err } if len(allNodes) == 0 { - return nil, nil, nil, ErrNoNodesAvailable + return "", nil, nil, ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. - return nil, nil, []*v1.Pod{pod}, nil + return "", nil, []*v1.Pod{pod}, nil } var pdbs []*policy.PodDisruptionBudget if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { - return nil, nil, nil, err + return "", nil, nil, err } } - nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) + nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) if err != nil { - return nil, nil, nil, err + return "", nil, nil, err } - // We will only check nodeToVictims with extenders that support preemption. + // We will only check nodeNameToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. - nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) + nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims) if err != nil { - return nil, nil, nil, err + return "", nil, nil, err } - candidateNode := pickOneNodeForPreemption(nodeToVictims) - if candidateNode == nil { - return nil, nil, nil, nil + candidateNode := pickOneNodeForPreemption(nodeNameToVictims) + if len(candidateNode) == 0 { + return "", nil, nil, nil } // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. - nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) - return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil + nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode) + return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil } // processPreemptionWithExtenders processes preemption with extenders func (g *genericScheduler) processPreemptionWithExtenders( pod *v1.Pod, - nodeToVictims map[*v1.Node]*extenderv1.Victims, -) (map[*v1.Node]*extenderv1.Victims, error) { - if len(nodeToVictims) > 0 { + nodeNameToVictims map[string]*extenderv1.Victims, +) (map[string]*extenderv1.Victims, error) { + if len(nodeNameToVictims) > 0 { for _, extender := range g.extenders { if extender.SupportsPreemption() && extender.IsInterested(pod) { - newNodeToVictims, err := extender.ProcessPreemption( + newNodeNameToVictims, err := extender.ProcessPreemption( pod, - nodeToVictims, + nodeNameToVictims, g.nodeInfoSnapshot.NodeInfos(), ) if err != nil { @@ -330,19 +330,19 @@ func (g *genericScheduler) processPreemptionWithExtenders( return nil, err } - // Replace nodeToVictims with new result after preemption. So the + // Replace nodeNameToVictims with new result after preemption. So the // rest of extenders can continue use it as parameter. - nodeToVictims = newNodeToVictims + nodeNameToVictims = newNodeNameToVictims // If node list becomes empty, no preemption can happen regardless of other extenders. - if len(nodeToVictims) == 0 { + if len(nodeNameToVictims) == 0 { break } } } } - return nodeToVictims, nil + return nodeNameToVictims, nil } // getLowerPriorityNominatedPods returns pods whose priority is smaller than the @@ -719,12 +719,12 @@ func (g *genericScheduler) prioritizeNodes( // 6. If there are still ties, the first such node is picked (sort of randomly). // The 'minNodes1' and 'minNodes2' are being reused here to save the memory // allocation and garbage collection time. -func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node { +func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string { if len(nodesToVictims) == 0 { - return nil + return "" } minNumPDBViolatingPods := int64(math.MaxInt32) - var minNodes1 []*v1.Node + var minNodes1 []string lenNodes1 := 0 for node, victims := range nodesToVictims { if len(victims.Pods) == 0 { @@ -752,7 +752,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) * // There are more than one node with minimum number PDB violating pods. Find // the one with minimum highest priority victim. minHighestPriority := int32(math.MaxInt32) - var minNodes2 = make([]*v1.Node, lenNodes1) + var minNodes2 = make([]string, lenNodes1) lenNodes2 := 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] @@ -855,8 +855,8 @@ func (g *genericScheduler) selectNodesForPreemption( pod *v1.Pod, potentialNodes []*framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, -) (map[*v1.Node]*extenderv1.Victims, error) { - nodeToVictims := map[*v1.Node]*extenderv1.Victims{} +) (map[string]*extenderv1.Victims, error) { + nodeNameToVictims := map[string]*extenderv1.Victims{} var resultLock sync.Mutex checkNode := func(i int) { @@ -869,12 +869,12 @@ func (g *genericScheduler) selectNodesForPreemption( Pods: pods, NumPDBViolations: int64(numPDBViolations), } - nodeToVictims[potentialNodes[i].Node()] = &victims + nodeNameToVictims[potentialNodes[i].Node().Name] = &victims resultLock.Unlock() } } parallelize.Until(ctx, len(potentialNodes), checkNode) - return nodeToVictims, nil + return nodeNameToVictims, nil } // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index d33b04e4c30..ed04735610b 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1161,10 +1161,10 @@ func TestZeroRequest(t *testing.T) { } } -func printNodeToVictims(nodeToVictims map[*v1.Node]*extenderv1.Victims) string { +func printNodeNameToVictims(nodeNameToVictims map[string]*extenderv1.Victims) string { var output string - for node, victims := range nodeToVictims { - output += node.Name + ": [" + for nodeName, victims := range nodeNameToVictims { + output += nodeName + ": [" for _, pod := range victims.Pods { output += pod.Name + ", " } @@ -1178,12 +1178,12 @@ type victims struct { numPDBViolations int64 } -func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node]*extenderv1.Victims) error { +func checkPreemptionVictims(expected map[string]victims, nodeToPods map[string]*extenderv1.Victims) error { if len(expected) == len(nodeToPods) { for k, victims := range nodeToPods { - if expVictims, ok := expected[k.Name]; ok { + if expVictims, ok := expected[k]; ok { if len(victims.Pods) != len(expVictims.pods) { - return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) + return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods)) } prevPriority := int32(math.MaxInt32) for _, p := range victims.Pods { @@ -1200,11 +1200,11 @@ func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node return fmt.Errorf("unexpected numPDBViolations. expected: %d, got: %d", expVictims.numPDBViolations, victims.NumPDBViolations) } } else { - return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) + return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods)) } } } else { - return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) + return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods)) } return nil } @@ -1916,7 +1916,7 @@ func TestPickOneNodeForPreemption(t *testing.T) { node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { - if node.Name == nodeName { + if node == nodeName { found = true break } @@ -2429,10 +2429,10 @@ func TestPreempt(t *testing.T) { if err != nil { t.Errorf("unexpected error in preemption: %v", err) } - if node != nil && node.Name != test.expectedNode { - t.Errorf("expected node: %v, got: %v", test.expectedNode, node.GetName()) + if len(node) != 0 && node != test.expectedNode { + t.Errorf("expected node: %v, got: %v", test.expectedNode, node) } - if node == nil && len(test.expectedNode) != 0 { + if len(node) == 0 && len(test.expectedNode) != 0 { t.Errorf("expected node: %v, got: nothing", test.expectedNode) } if len(victims) != len(test.expectedPods) { @@ -2452,14 +2452,14 @@ func TestPreempt(t *testing.T) { // Mark the victims for deletion and record the preemptor's nominated node name. now := metav1.Now() victim.DeletionTimestamp = &now - test.pod.Status.NominatedNodeName = node.Name + test.pod.Status.NominatedNodeName = node } // Call preempt again and make sure it doesn't preempt any more pods. node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } - if node != nil && len(victims) > 0 { + if len(node) != 0 && len(victims) > 0 { t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node) } close(stop) diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 39aa52a4d25..01567678dde 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -499,10 +499,10 @@ func (f *fakeExtender) IsIgnorable() bool { } func (f *fakeExtender) ProcessPreemption( - pod *v1.Pod, - nodeToVictims map[*v1.Node]*extenderv1.Victims, - nodeInfos framework.NodeInfoLister, -) (map[*v1.Node]*extenderv1.Victims, error) { + _ *v1.Pod, + _ map[string]*extenderv1.Victims, + _ framework.NodeInfoLister, +) (map[string]*extenderv1.Victims, error) { return nil, nil } @@ -515,8 +515,8 @@ func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v } func (f *fakeExtender) Prioritize( - pod *v1.Pod, - nodes []*v1.Node, + _ *v1.Pod, + _ []*v1.Node, ) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) { return nil, 0, nil } diff --git a/pkg/scheduler/framework/v1alpha1/extender.go b/pkg/scheduler/framework/v1alpha1/extender.go new file mode 100644 index 00000000000..4118394391a --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/extender.go @@ -0,0 +1,70 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + v1 "k8s.io/api/core/v1" + extenderv1 "k8s.io/kube-scheduler/extender/v1" +) + +// Extender is an interface for external processes to influence scheduling +// decisions made by Kubernetes. This is typically needed for resources not directly +// managed by Kubernetes. +type Extender interface { + // Name returns a unique name that identifies the extender. + Name() string + + // Filter based on extender-implemented predicate functions. The filtered list is + // expected to be a subset of the supplied list. failedNodesMap optionally contains + // the list of failed nodes and failure reasons. + Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) + + // Prioritize based on extender-implemented priority functions. The returned scores & weight + // are used to compute the weighted score for an extender. The weighted scores are added to + // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection. + Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) + + // Bind delegates the action of binding a pod to a node to the extender. + Bind(binding *v1.Binding) error + + // IsBinder returns whether this extender is configured for the Bind method. + IsBinder() bool + + // IsInterested returns true if at least one extended resource requested by + // this pod is managed by this extender. + IsInterested(pod *v1.Pod) bool + + // ProcessPreemption returns nodes with their victim pods processed by extender based on + // given: + // 1. Pod to schedule + // 2. Candidate nodes and victim pods (nodeNameToVictims) generated by previous scheduling process. + // The possible changes made by extender may include: + // 1. Subset of given candidate nodes after preemption phase of extender. + // 2. A different set of victim pod for every given candidate node after preemption phase of extender. + ProcessPreemption( + pod *v1.Pod, + nodeNameToVictims map[string]*extenderv1.Victims, + nodeInfos NodeInfoLister, + ) (map[string]*extenderv1.Victims, error) + + // SupportsPreemption returns if the scheduler extender support preemption or not. + SupportsPreemption() bool + + // IsIgnorable returns true indicates scheduling should not fail when this extender + // is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well. + IsIgnorable() bool +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index efac50e1122..d061ca006fb 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -405,14 +405,12 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat return "", err } - node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) + nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err } - var nodeName = "" - if node != nil { - nodeName = node.Name + if len(nodeName) != 0 { // Update the scheduling queue with the nominated pod information. Without // this, there would be a race condition between the next scheduling cycle // and the time the scheduler receives a Pod Update for the nominated pod. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9413e6819db..03b2bc3f743 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -144,8 +144,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, func (es mockScheduler) Extenders() []core.SchedulerExtender { return nil } -func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { - return nil, nil, nil, nil +func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { + return "", nil, nil, nil } func TestSchedulerCreation(t *testing.T) { From eb17b7559cd9969188bb1eb7188f2269b4e5e3e0 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Tue, 12 May 2020 00:12:24 -0700 Subject: [PATCH 2/2] move SchedulerExtender interface to pkg/scheduler/framework/v1alpha1 --- pkg/scheduler/core/extender.go | 51 +------------------- pkg/scheduler/core/extender_test.go | 4 +- pkg/scheduler/core/generic_scheduler.go | 8 +-- pkg/scheduler/core/generic_scheduler_test.go | 8 +-- pkg/scheduler/factory.go | 4 +- pkg/scheduler/framework/v1alpha1/BUILD | 2 + pkg/scheduler/scheduler_test.go | 12 ++--- 7 files changed, 22 insertions(+), 67 deletions(-) diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 76bb49db6f5..b0fca92189c 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -38,54 +38,7 @@ const ( DefaultExtenderTimeout = 5 * time.Second ) -// SchedulerExtender is an interface for external processes to influence scheduling -// decisions made by Kubernetes. This is typically needed for resources not directly -// managed by Kubernetes. -type SchedulerExtender interface { - // Name returns a unique name that identifies the extender. - Name() string - - // Filter based on extender-implemented predicate functions. The filtered list is - // expected to be a subset of the supplied list. failedNodesMap optionally contains - // the list of failed nodes and failure reasons. - Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) - - // Prioritize based on extender-implemented priority functions. The returned scores & weight - // are used to compute the weighted score for an extender. The weighted scores are added to - // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection. - Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) - - // Bind delegates the action of binding a pod to a node to the extender. - Bind(binding *v1.Binding) error - - // IsBinder returns whether this extender is configured for the Bind method. - IsBinder() bool - - // IsInterested returns true if at least one extended resource requested by - // this pod is managed by this extender. - IsInterested(pod *v1.Pod) bool - - // ProcessPreemption returns nodes with their victim pods processed by extender based on - // given: - // 1. Pod to schedule - // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process. - // The possible changes made by extender may include: - // 1. Subset of given candidate nodes after preemption phase of extender. - // 2. A different set of victim pod for every given candidate node after preemption phase of extender. - ProcessPreemption( - pod *v1.Pod, - nodeToVictims map[string]*extenderv1.Victims, - nodeInfos framework.NodeInfoLister) (map[string]*extenderv1.Victims, error) - - // SupportsPreemption returns if the scheduler extender support preemption or not. - SupportsPreemption() bool - - // IsIgnorable returns true indicates scheduling should not fail when this extender - // is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well. - IsIgnorable() bool -} - -// HTTPExtender implements the SchedulerExtender interface. +// HTTPExtender implements the Extender interface. type HTTPExtender struct { extenderURL string preemptVerb string @@ -130,7 +83,7 @@ func makeTransport(config *schedulerapi.Extender) (http.RoundTripper, error) { } // NewHTTPExtender creates an HTTPExtender object. -func NewHTTPExtender(config *schedulerapi.Extender) (SchedulerExtender, error) { +func NewHTTPExtender(config *schedulerapi.Extender) (framework.Extender, error) { if config.HTTPTimeout.Nanoseconds() == 0 { config.HTTPTimeout = time.Duration(DefaultExtenderTimeout) } diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 0c43b37eb42..3df352f79f5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -353,7 +353,7 @@ func (f *FakeExtender) IsInterested(pod *v1.Pod) bool { return !f.unInterested } -var _ SchedulerExtender = &FakeExtender{} +var _ framework.Extender = &FakeExtender{} func TestGenericSchedulerWithExtenders(t *testing.T) { tests := []struct { @@ -575,7 +575,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { client := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - extenders := []SchedulerExtender{} + extenders := []framework.Extender{} for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index b5d848879e1..3f237383298 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -108,7 +108,7 @@ type ScheduleAlgorithm interface { Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) // Extenders returns a slice of extender config. This is exposed for // testing. - Extenders() []SchedulerExtender + Extenders() []framework.Extender } // ScheduleResult represents the result of one pod scheduled. It will contain @@ -125,7 +125,7 @@ type ScheduleResult struct { type genericScheduler struct { cache internalcache.Cache schedulingQueue internalqueue.SchedulingQueue - extenders []SchedulerExtender + extenders []framework.Extender nodeInfoSnapshot *internalcache.Snapshot pvcLister corelisters.PersistentVolumeClaimLister pdbLister policylisters.PodDisruptionBudgetLister @@ -210,7 +210,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, }, err } -func (g *genericScheduler) Extenders() []SchedulerExtender { +func (g *genericScheduler) Extenders() []framework.Extender { return g.extenders } @@ -1103,7 +1103,7 @@ func NewGenericScheduler( cache internalcache.Cache, podQueue internalqueue.SchedulingQueue, nodeInfoSnapshot *internalcache.Snapshot, - extenders []SchedulerExtender, + extenders []framework.Extender, pvcLister corelisters.PersistentVolumeClaimLister, pdbLister policylisters.PodDisruptionBudgetLister, disablePreemption bool, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ed04735610b..dd6457230b6 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -813,7 +813,7 @@ func TestGenericScheduler(t *testing.T) { cache, internalqueue.NewSchedulingQueue(nil), snapshot, - []SchedulerExtender{}, + []framework.Extender{}, pvcLister, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, @@ -1134,7 +1134,7 @@ func TestZeroRequest(t *testing.T) { nil, nil, emptySnapshot, - []SchedulerExtender{}, + []framework.Extender{}, nil, nil, false, @@ -1613,7 +1613,7 @@ func TestSelectNodesForPreemption(t *testing.T) { nil, internalqueue.NewSchedulingQueue(nil), snapshot, - []SchedulerExtender{}, + []framework.Extender{}, nil, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, @@ -2391,7 +2391,7 @@ func TestPreempt(t *testing.T) { cachedNodeInfo.SetNode(node) cachedNodeInfoMap[node.Name] = cachedNodeInfo } - var extenders []SchedulerExtender + var extenders []framework.Extender for _, extender := range test.extenders { // Set nodeInfoMap as extenders cached node information. extender.cachedNodeNameToInfo = cachedNodeInfoMap diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index a5fe324a8e1..8b50f23d384 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -126,10 +126,10 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram // create a scheduler from a set of registered plugins. func (c *Configurator) create() (*Scheduler, error) { - var extenders []core.SchedulerExtender + var extenders []framework.Extender var ignoredExtendedResources []string if len(c.extenders) != 0 { - var ignorableExtenders []core.SchedulerExtender + var ignorableExtenders []framework.Extender for ii := range c.extenders { klog.V(2).Infof("Creating extender with config %+v", c.extenders[ii]) extender, err := core.NewHTTPExtender(&c.extenders[ii]) diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 341222deb9d..3910c739490 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "cycle_state.go", + "extender.go", "framework.go", "interface.go", "listers.go", @@ -34,6 +35,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/kube-scheduler/config/v1alpha2:go_default_library", + "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 03b2bc3f743..0a58adefe12 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -141,7 +141,7 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, return es.result, es.err } -func (es mockScheduler) Extenders() []core.SchedulerExtender { +func (es mockScheduler) Extenders() []framework.Extender { return nil } func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { @@ -815,7 +815,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C scache, internalqueue.NewSchedulingQueue(nil), internalcache.NewEmptySnapshot(), - []core.SchedulerExtender{}, + []framework.Extender{}, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, @@ -1104,14 +1104,14 @@ priorities: func TestSchedulerBinding(t *testing.T) { table := []struct { podName string - extenders []core.SchedulerExtender + extenders []framework.Extender wantBinderID int name string }{ { name: "the extender is not a binder", podName: "pod0", - extenders: []core.SchedulerExtender{ + extenders: []framework.Extender{ &fakeExtender{isBinder: false, interestedPodName: "pod0"}, }, wantBinderID: -1, // default binding. @@ -1119,7 +1119,7 @@ func TestSchedulerBinding(t *testing.T) { { name: "one of the extenders is a binder and interested in pod", podName: "pod0", - extenders: []core.SchedulerExtender{ + extenders: []framework.Extender{ &fakeExtender{isBinder: false, interestedPodName: "pod0"}, &fakeExtender{isBinder: true, interestedPodName: "pod0"}, }, @@ -1128,7 +1128,7 @@ func TestSchedulerBinding(t *testing.T) { { name: "one of the extenders is a binder, but not interested in pod", podName: "pod1", - extenders: []core.SchedulerExtender{ + extenders: []framework.Extender{ &fakeExtender{isBinder: false, interestedPodName: "pod1"}, &fakeExtender{isBinder: true, interestedPodName: "pod0"}, },