cleanup: use string instead of v1.Node as key of nodeToVictims

This commit is contained in:
Wei Huang 2020-05-11 12:52:13 -07:00
parent 0024c837ba
commit e4f878ea3a
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
8 changed files with 162 additions and 106 deletions

View File

@ -69,14 +69,13 @@ type SchedulerExtender interface {
// given: // given:
// 1. Pod to schedule // 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process. // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
// The possible changes made by extender may include: // The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender. // 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender. // 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption( ProcessPreemption(
pod *v1.Pod, pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error) nodeInfos framework.NodeInfoLister) (map[string]*extenderv1.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not. // SupportsPreemption returns if the scheduler extender support preemption or not.
SupportsPreemption() bool SupportsPreemption() bool
@ -212,9 +211,9 @@ func (h *HTTPExtender) SupportsPreemption() bool {
// ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender. // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
func (h *HTTPExtender) ProcessPreemption( func (h *HTTPExtender) ProcessPreemption(
pod *v1.Pod, pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister, nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
var ( var (
result extenderv1.ExtenderPreemptionResult result extenderv1.ExtenderPreemptionResult
args *extenderv1.ExtenderPreemptionArgs args *extenderv1.ExtenderPreemptionArgs
@ -226,13 +225,12 @@ func (h *HTTPExtender) ProcessPreemption(
if h.nodeCacheCapable { if h.nodeCacheCapable {
// If extender has cached node info, pass NodeNameToMetaVictims in args. // If extender has cached node info, pass NodeNameToMetaVictims in args.
nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims) nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeNameToVictims)
args = &extenderv1.ExtenderPreemptionArgs{ args = &extenderv1.ExtenderPreemptionArgs{
Pod: pod, Pod: pod,
NodeNameToMetaVictims: nodeNameToMetaVictims, NodeNameToMetaVictims: nodeNameToMetaVictims,
} }
} else { } else {
nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
args = &extenderv1.ExtenderPreemptionArgs{ args = &extenderv1.ExtenderPreemptionArgs{
Pod: pod, Pod: pod,
NodeNameToVictims: nodeNameToVictims, NodeNameToVictims: nodeNameToVictims,
@ -244,22 +242,22 @@ func (h *HTTPExtender) ProcessPreemption(
} }
// Extender will always return NodeNameToMetaVictims. // Extender will always return NodeNameToMetaVictims.
// So let's convert it to NodeToVictims by using NodeNameToInfo. // So let's convert it to NodeNameToVictims by using <nodeInfos>.
newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeInfos) newNodeNameToVictims, err := h.convertToNodeNameToVictims(result.NodeNameToMetaVictims, nodeInfos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Do not override nodeToVictims // Do not override <nodeNameToVictims>.
return newNodeToVictims, nil return newNodeNameToVictims, nil
} }
// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers, // convertToNodeNameToVictims converts "nodeNameToMetaVictims" from object identifiers,
// such as UIDs and names, to object pointers. // such as UIDs and names, to object pointers.
func (h *HTTPExtender) convertToNodeToVictims( func (h *HTTPExtender) convertToNodeNameToVictims(
nodeNameToMetaVictims map[string]*extenderv1.MetaVictims, nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
nodeInfos framework.NodeInfoLister, nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{} nodeNameToVictims := map[string]*extenderv1.Victims{}
for nodeName, metaVictims := range nodeNameToMetaVictims { for nodeName, metaVictims := range nodeNameToMetaVictims {
nodeInfo, err := nodeInfos.Get(nodeName) nodeInfo, err := nodeInfos.Get(nodeName)
if err != nil { if err != nil {
@ -275,9 +273,9 @@ func (h *HTTPExtender) convertToNodeToVictims(
} }
victims.Pods = append(victims.Pods, pod) victims.Pods = append(victims.Pods, pod)
} }
nodeToVictims[nodeInfo.Node()] = victims nodeNameToVictims[nodeName] = victims
} }
return nodeToVictims, nil return nodeNameToVictims, nil
} }
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node info. // convertPodUIDToPod returns v1.Pod object for given MetaPod and node info.
@ -298,10 +296,10 @@ func (h *HTTPExtender) convertPodUIDToPod(
// convertToNodeNameToMetaVictims converts from struct type to meta types. // convertToNodeNameToMetaVictims converts from struct type to meta types.
func convertToNodeNameToMetaVictims( func convertToNodeNameToMetaVictims(
nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeNameToVictims map[string]*extenderv1.Victims,
) map[string]*extenderv1.MetaVictims { ) map[string]*extenderv1.MetaVictims {
nodeNameToVictims := map[string]*extenderv1.MetaVictims{} nodeNameToMetaVictims := map[string]*extenderv1.MetaVictims{}
for node, victims := range nodeToVictims { for node, victims := range nodeNameToVictims {
metaVictims := &extenderv1.MetaVictims{ metaVictims := &extenderv1.MetaVictims{
Pods: []*extenderv1.MetaPod{}, Pods: []*extenderv1.MetaPod{},
} }
@ -311,20 +309,9 @@ func convertToNodeNameToMetaVictims(
} }
metaVictims.Pods = append(metaVictims.Pods, metaPod) metaVictims.Pods = append(metaVictims.Pods, metaPod)
} }
nodeNameToVictims[node.GetName()] = metaVictims nodeNameToMetaVictims[node] = metaVictims
} }
return nodeNameToVictims return nodeNameToMetaVictims
}
// convertToNodeNameToVictims converts from node type to node name as key.
func convertToNodeNameToVictims(
nodeToVictims map[*v1.Node]*extenderv1.Victims,
) map[string]*extenderv1.Victims {
nodeNameToVictims := map[string]*extenderv1.Victims{}
for node, victims := range nodeToVictims {
nodeNameToVictims[node.GetName()] = victims
}
return nodeNameToVictims
} }
// Filter based on extender implemented predicate functions. The filtered list is // Filter based on extender implemented predicate functions. The filtered list is

View File

@ -159,36 +159,37 @@ func (f *FakeExtender) SupportsPreemption() bool {
func (f *FakeExtender) ProcessPreemption( func (f *FakeExtender) ProcessPreemption(
pod *v1.Pod, pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister, nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
nodeToVictimsCopy := map[*v1.Node]*extenderv1.Victims{} nodeNameToVictimsCopy := map[string]*extenderv1.Victims{}
// We don't want to change the original nodeToVictims // We don't want to change the original nodeNameToVictims
for k, v := range nodeToVictims { for k, v := range nodeNameToVictims {
// In real world implementation, extender's user should have their own way to get node object // In real world implementation, extender's user should have their own way to get node object
// by name if needed (e.g. query kube-apiserver etc). // by name if needed (e.g. query kube-apiserver etc).
// //
// For test purpose, we just use node from parameters directly. // For test purpose, we just use node from parameters directly.
nodeToVictimsCopy[k] = v nodeNameToVictimsCopy[k] = v
} }
for node, victims := range nodeToVictimsCopy { for nodeName, victims := range nodeNameToVictimsCopy {
// Try to do preemption on extender side. // Try to do preemption on extender side.
extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node) nodeInfo, _ := nodeInfos.Get(nodeName)
extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node())
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall, // If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
// let's remove it from potential preemption nodes. // let's remove it from potential preemption nodes.
if !fits { if !fits {
delete(nodeToVictimsCopy, node) delete(nodeNameToVictimsCopy, nodeName)
} else { } else {
// Append new victims to original victims // Append new victims to original victims
nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...)
nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + int64(extendernPDBViolations) nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations)
} }
} }
return nodeToVictimsCopy, nil return nodeNameToVictimsCopy, nil
} }
// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. // selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.

View File

@ -105,7 +105,7 @@ type ScheduleAlgorithm interface {
// the pod by preempting lower priority pods if possible. // the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a // It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any. // list of pods whose nominated node name should be removed, and error if any.
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Extenders returns a slice of extender config. This is exposed for // Extenders returns a slice of extender config. This is exposed for
// testing. // testing.
Extenders() []SchedulerExtender Extenders() []SchedulerExtender
@ -251,74 +251,74 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// other pods with the same priority. The nominated pod prevents other pods from // other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time // using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods. // before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if // Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError. // the error is of type FitError.
fitError, ok := scheduleErr.(*FitError) fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil { if !ok || fitError == nil {
return nil, nil, nil, nil return "", nil, nil, nil
} }
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) { if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil return "", nil, nil, nil
} }
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil { if err != nil {
return nil, nil, nil, err return "", nil, nil, err
} }
if len(allNodes) == 0 { if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable return "", nil, nil, ErrNoNodesAvailable
} }
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
if len(potentialNodes) == 0 { if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod. // In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil return "", nil, []*v1.Pod{pod}, nil
} }
var pdbs []*policy.PodDisruptionBudget var pdbs []*policy.PodDisruptionBudget
if g.pdbLister != nil { if g.pdbLister != nil {
pdbs, err = g.pdbLister.List(labels.Everything()) pdbs, err = g.pdbLister.List(labels.Everything())
if err != nil { if err != nil {
return nil, nil, nil, err return "", nil, nil, err
} }
} }
nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
if err != nil { if err != nil {
return nil, nil, nil, err return "", nil, nil, err
} }
// We will only check nodeToVictims with extenders that support preemption. // We will only check nodeNameToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
if err != nil { if err != nil {
return nil, nil, nil, err return "", nil, nil, err
} }
candidateNode := pickOneNodeForPreemption(nodeToVictims) candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
if candidateNode == nil { if len(candidateNode) == 0 {
return nil, nil, nil, nil return "", nil, nil, nil
} }
// Lower priority pods nominated to run on this node, may no longer fit on // Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their // this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It // nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil
} }
// processPreemptionWithExtenders processes preemption with extenders // processPreemptionWithExtenders processes preemption with extenders
func (g *genericScheduler) processPreemptionWithExtenders( func (g *genericScheduler) processPreemptionWithExtenders(
pod *v1.Pod, pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeNameToVictims map[string]*extenderv1.Victims,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
if len(nodeToVictims) > 0 { if len(nodeNameToVictims) > 0 {
for _, extender := range g.extenders { for _, extender := range g.extenders {
if extender.SupportsPreemption() && extender.IsInterested(pod) { if extender.SupportsPreemption() && extender.IsInterested(pod) {
newNodeToVictims, err := extender.ProcessPreemption( newNodeNameToVictims, err := extender.ProcessPreemption(
pod, pod,
nodeToVictims, nodeNameToVictims,
g.nodeInfoSnapshot.NodeInfos(), g.nodeInfoSnapshot.NodeInfos(),
) )
if err != nil { if err != nil {
@ -330,19 +330,19 @@ func (g *genericScheduler) processPreemptionWithExtenders(
return nil, err return nil, err
} }
// Replace nodeToVictims with new result after preemption. So the // Replace nodeNameToVictims with new result after preemption. So the
// rest of extenders can continue use it as parameter. // rest of extenders can continue use it as parameter.
nodeToVictims = newNodeToVictims nodeNameToVictims = newNodeNameToVictims
// If node list becomes empty, no preemption can happen regardless of other extenders. // If node list becomes empty, no preemption can happen regardless of other extenders.
if len(nodeToVictims) == 0 { if len(nodeNameToVictims) == 0 {
break break
} }
} }
} }
} }
return nodeToVictims, nil return nodeNameToVictims, nil
} }
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the // getLowerPriorityNominatedPods returns pods whose priority is smaller than the
@ -719,12 +719,12 @@ func (g *genericScheduler) prioritizeNodes(
// 6. If there are still ties, the first such node is picked (sort of randomly). // 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time. // allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node { func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
if len(nodesToVictims) == 0 { if len(nodesToVictims) == 0 {
return nil return ""
} }
minNumPDBViolatingPods := int64(math.MaxInt32) minNumPDBViolatingPods := int64(math.MaxInt32)
var minNodes1 []*v1.Node var minNodes1 []string
lenNodes1 := 0 lenNodes1 := 0
for node, victims := range nodesToVictims { for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 { if len(victims.Pods) == 0 {
@ -752,7 +752,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *
// There are more than one node with minimum number PDB violating pods. Find // There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim. // the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32) minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1) var minNodes2 = make([]string, lenNodes1)
lenNodes2 := 0 lenNodes2 := 0
for i := 0; i < lenNodes1; i++ { for i := 0; i < lenNodes1; i++ {
node := minNodes1[i] node := minNodes1[i]
@ -855,8 +855,8 @@ func (g *genericScheduler) selectNodesForPreemption(
pod *v1.Pod, pod *v1.Pod,
potentialNodes []*framework.NodeInfo, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{} nodeNameToVictims := map[string]*extenderv1.Victims{}
var resultLock sync.Mutex var resultLock sync.Mutex
checkNode := func(i int) { checkNode := func(i int) {
@ -869,12 +869,12 @@ func (g *genericScheduler) selectNodesForPreemption(
Pods: pods, Pods: pods,
NumPDBViolations: int64(numPDBViolations), NumPDBViolations: int64(numPDBViolations),
} }
nodeToVictims[potentialNodes[i].Node()] = &victims nodeNameToVictims[potentialNodes[i].Node().Name] = &victims
resultLock.Unlock() resultLock.Unlock()
} }
} }
parallelize.Until(ctx, len(potentialNodes), checkNode) parallelize.Until(ctx, len(potentialNodes), checkNode)
return nodeToVictims, nil return nodeNameToVictims, nil
} }
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"

View File

@ -1161,10 +1161,10 @@ func TestZeroRequest(t *testing.T) {
} }
} }
func printNodeToVictims(nodeToVictims map[*v1.Node]*extenderv1.Victims) string { func printNodeNameToVictims(nodeNameToVictims map[string]*extenderv1.Victims) string {
var output string var output string
for node, victims := range nodeToVictims { for nodeName, victims := range nodeNameToVictims {
output += node.Name + ": [" output += nodeName + ": ["
for _, pod := range victims.Pods { for _, pod := range victims.Pods {
output += pod.Name + ", " output += pod.Name + ", "
} }
@ -1178,12 +1178,12 @@ type victims struct {
numPDBViolations int64 numPDBViolations int64
} }
func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node]*extenderv1.Victims) error { func checkPreemptionVictims(expected map[string]victims, nodeToPods map[string]*extenderv1.Victims) error {
if len(expected) == len(nodeToPods) { if len(expected) == len(nodeToPods) {
for k, victims := range nodeToPods { for k, victims := range nodeToPods {
if expVictims, ok := expected[k.Name]; ok { if expVictims, ok := expected[k]; ok {
if len(victims.Pods) != len(expVictims.pods) { if len(victims.Pods) != len(expVictims.pods) {
return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
} }
prevPriority := int32(math.MaxInt32) prevPriority := int32(math.MaxInt32)
for _, p := range victims.Pods { for _, p := range victims.Pods {
@ -1200,11 +1200,11 @@ func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node
return fmt.Errorf("unexpected numPDBViolations. expected: %d, got: %d", expVictims.numPDBViolations, victims.NumPDBViolations) return fmt.Errorf("unexpected numPDBViolations. expected: %d, got: %d", expVictims.numPDBViolations, victims.NumPDBViolations)
} }
} else { } else {
return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
} }
} }
} else { } else {
return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods)) return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
} }
return nil return nil
} }
@ -1916,7 +1916,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
node := pickOneNodeForPreemption(candidateNodes) node := pickOneNodeForPreemption(candidateNodes)
found := false found := false
for _, nodeName := range test.expected { for _, nodeName := range test.expected {
if node.Name == nodeName { if node == nodeName {
found = true found = true
break break
} }
@ -2429,10 +2429,10 @@ func TestPreempt(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", err)
} }
if node != nil && node.Name != test.expectedNode { if len(node) != 0 && node != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, node.GetName()) t.Errorf("expected node: %v, got: %v", test.expectedNode, node)
} }
if node == nil && len(test.expectedNode) != 0 { if len(node) == 0 && len(test.expectedNode) != 0 {
t.Errorf("expected node: %v, got: nothing", test.expectedNode) t.Errorf("expected node: %v, got: nothing", test.expectedNode)
} }
if len(victims) != len(test.expectedPods) { if len(victims) != len(test.expectedPods) {
@ -2452,14 +2452,14 @@ func TestPreempt(t *testing.T) {
// Mark the victims for deletion and record the preemptor's nominated node name. // Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now() now := metav1.Now()
victim.DeletionTimestamp = &now victim.DeletionTimestamp = &now
test.pod.Status.NominatedNodeName = node.Name test.pod.Status.NominatedNodeName = node
} }
// Call preempt again and make sure it doesn't preempt any more pods. // Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
if err != nil { if err != nil {
t.Errorf("unexpected error in preemption: %v", err) t.Errorf("unexpected error in preemption: %v", err)
} }
if node != nil && len(victims) > 0 { if len(node) != 0 && len(victims) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node) t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
} }
close(stop) close(stop)

View File

@ -499,10 +499,10 @@ func (f *fakeExtender) IsIgnorable() bool {
} }
func (f *fakeExtender) ProcessPreemption( func (f *fakeExtender) ProcessPreemption(
pod *v1.Pod, _ *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims, _ map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister, _ framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) { ) (map[string]*extenderv1.Victims, error) {
return nil, nil return nil, nil
} }
@ -515,8 +515,8 @@ func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v
} }
func (f *fakeExtender) Prioritize( func (f *fakeExtender) Prioritize(
pod *v1.Pod, _ *v1.Pod,
nodes []*v1.Node, _ []*v1.Node,
) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) { ) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
return nil, 0, nil return nil, 0, nil
} }

View File

@ -0,0 +1,70 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
)
// Extender is an interface for external processes to influence scheduling
// decisions made by Kubernetes. This is typically needed for resources not directly
// managed by Kubernetes.
type Extender interface {
// Name returns a unique name that identifies the extender.
Name() string
// Filter based on extender-implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to
// the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeNameToVictims) generated by previous scheduling process.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos NodeInfoLister,
) (map[string]*extenderv1.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
SupportsPreemption() bool
// IsIgnorable returns true indicates scheduling should not fail when this extender
// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
IsIgnorable() bool
}

View File

@ -405,14 +405,12 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
return "", err return "", err
} }
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
if err != nil { if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err return "", err
} }
var nodeName = "" if len(nodeName) != 0 {
if node != nil {
nodeName = node.Name
// Update the scheduling queue with the nominated pod information. Without // Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle // this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod. // and the time the scheduler receives a Pod Update for the nominated pod.

View File

@ -144,8 +144,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile,
func (es mockScheduler) Extenders() []core.SchedulerExtender { func (es mockScheduler) Extenders() []core.SchedulerExtender {
return nil return nil
} }
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil return "", nil, nil, nil
} }
func TestSchedulerCreation(t *testing.T) { func TestSchedulerCreation(t *testing.T) {