Merge pull request #87038 from alculquicondor/cleanup/mv_snapshot

Remove direct use of Snapshot's data structures
This commit is contained in:
Kubernetes Prow Robot 2020-01-14 17:21:29 -08:00 committed by GitHub
commit b046f742bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 141 additions and 128 deletions

View File

@ -178,7 +178,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
} }
trace.Step("Snapshotting scheduler cache and node infos done") trace.Step("Snapshotting scheduler cache and node infos done")
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { if g.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable return result, ErrNoNodesAvailable
} }
@ -190,7 +190,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
trace.Step("Running prefilter plugins done") trace.Step("Running prefilter plugins done")
startPredicateEvalTime := time.Now() startPredicateEvalTime := time.Now()
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -205,7 +205,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
if len(filteredNodes) == 0 { if len(filteredNodes) == 0 {
return result, &FitError{ return result, &FitError{
Pod: pod, Pod: pod,
NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses, FilteredNodesStatuses: filteredNodesStatuses,
} }
} }
@ -294,19 +294,20 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil return nil, nil, nil, nil
} }
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable return nil, nil, nil, ErrNoNodesAvailable
} }
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoList, fitError) potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
if len(potentialNodes) == 0 { if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod. // In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil return nil, nil, []*v1.Pod{pod}, nil
} }
var ( var pdbs []*policy.PodDisruptionBudget
pdbs []*policy.PodDisruptionBudget
err error
)
if g.pdbLister != nil { if g.pdbLister != nil {
pdbs, err = g.pdbLister.List(labels.Everything()) pdbs, err = g.pdbLister.List(labels.Everything())
if err != nil { if err != nil {
@ -424,33 +425,52 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
return numNodes return numNodes
} }
// Filters the nodes to find the ones that fit based on the given predicate functions // Filters the nodes to find the ones that fit the pod based on the framework
// Each node is passed through the predicate functions to determine if it is a fit // filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
var filtered []*v1.Node filteredNodesStatuses := make(framework.NodeToStatusMap)
filteredNodesStatuses := framework.NodeToStatusMap{} filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
if !g.framework.HasFilterPlugins() { filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
filtered = g.nodeInfoSnapshot.ListNodes() if err != nil {
} else { return nil, nil, err
allNodes := len(g.nodeInfoSnapshot.NodeInfoList) }
numNodesToFind := g.numFeasibleNodesToFind(int32(allNodes)) return filtered, filteredNodesStatuses, nil
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, err
}
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create filtered list with enough space to avoid growing it // Create filtered list with enough space to avoid growing it
// and allow assigning. // and allow assigning.
filtered = make([]*v1.Node, numNodesToFind) filtered := make([]*v1.Node, numNodesToFind)
errCh := util.NewErrorChannel()
var (
predicateResultLock sync.Mutex
filteredLen int32
)
if !g.framework.HasFilterPlugins() {
for i := range filtered {
filtered[i] = allNodes[i].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % len(allNodes)
return filtered, nil
}
errCh := util.NewErrorChannel()
var statusesLock sync.Mutex
var filteredLen int32
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
checkNode := func(i int) { checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle, // We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods. // this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes] nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
fits, status, err := g.podFitsOnNode(ctx, state, pod, nodeInfo) fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
if err != nil { if err != nil {
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
@ -464,28 +484,32 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
filtered[length-1] = nodeInfo.Node() filtered[length-1] = nodeInfo.Node()
} }
} else { } else {
predicateResultLock.Lock() statusesLock.Lock()
if !status.IsSuccess() { if !status.IsSuccess() {
filteredNodesStatuses[nodeInfo.Node().Name] = status statuses[nodeInfo.Node().Name] = status
} }
predicateResultLock.Unlock() statusesLock.Unlock()
} }
} }
// Stops searching for more nodes once the configured number of feasible nodes // Stops searching for more nodes once the configured number of feasible nodes
// are found. // are found.
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(filteredNodesStatuses) processedNodes := int(filteredLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % allNodes g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
filtered = filtered[:filteredLen] filtered = filtered[:filteredLen]
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return []*v1.Node{}, framework.NodeToStatusMap{}, err return nil, err
} }
return filtered, nil
} }
if len(filtered) > 0 && len(g.extenders) != 0 { func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
for _, extender := range g.extenders { for _, extender := range g.extenders {
if len(filtered) == 0 {
break
}
if !extender.IsInterested(pod) { if !extender.IsInterested(pod) {
continue continue
} }
@ -496,24 +520,19 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
extender, err) extender, err)
continue continue
} }
return nil, err
return []*v1.Node{}, framework.NodeToStatusMap{}, err
} }
for failedNodeName, failedMsg := range failedMap { for failedNodeName, failedMsg := range failedMap {
if _, found := filteredNodesStatuses[failedNodeName]; !found { if _, found := statuses[failedNodeName]; !found {
filteredNodesStatuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg) statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else { } else {
filteredNodesStatuses[failedNodeName].AppendReason(failedMsg) statuses[failedNodeName].AppendReason(failedMsg)
} }
} }
filtered = filteredList filtered = filteredList
if len(filtered) == 0 {
break
} }
} return filtered, nil
}
return filtered, filteredNodesStatuses, nil
} }
// addNominatedPods adds pods with equal or greater priority which are nominated // addNominatedPods adds pods with equal or greater priority which are nominated
@ -545,17 +564,17 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st
return podsAdded, stateOut, nodeInfoOut, nil return podsAdded, stateOut, nodeInfoOut, nil
} }
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. // podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached // filter plugins.
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt. // This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable // When it is called from Schedule, we want to test whether the pod is
// on the node with all the existing pods on the node plus higher and equal priority // schedulable on the node with all the existing pods on the node plus higher
// pods nominated to run on the node. // and equal priority pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and // When it is called from Preempt, we should remove the victims of preemption
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode(). // and add the nominated pods. Removal of the victims is done by
// It removes victims from PreFilter state and NodeInfo before calling this function. // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
func (g *genericScheduler) podFitsOnNode( // NodeInfo before calling this function.
func (g *genericScheduler) podPassesFiltersOnNode(
ctx context.Context, ctx context.Context,
state *framework.CycleState, state *framework.CycleState,
pod *v1.Pod, pod *v1.Pod,
@ -564,21 +583,21 @@ func (g *genericScheduler) podFitsOnNode(
var status *framework.Status var status *framework.Status
podsAdded := false podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority // We run filters twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
// If all predicates succeed in this pass, we run them again when these // If all filters succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some // nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods. // filters such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the // If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass. // filters fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because // We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened // those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for // for running them. It is ok if the current "pod" take resources freed for
// lower priority pods. // lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that // Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod // we are making a conservative decision: filters like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated // anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when // as running, while filters like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the // the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact, // nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node. // they may end up getting scheduled to a different node.
@ -648,7 +667,7 @@ func (g *genericScheduler) prioritizeNodes(
if len(g.extenders) != 0 && nodes != nil { if len(g.extenders) != 0 && nodes != nil {
var mu sync.Mutex var mu sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) combinedScores := make(map[string]int64, len(nodes))
for i := range g.extenders { for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) { if !g.extenders[i].IsInterested(pod) {
continue continue
@ -959,7 +978,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// inter-pod affinity to one or more victims, but we have decided not to // inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower // support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway. // priority pods is not a recommended configuration anyway.
if fits, _, err := g.podFitsOnNode(ctx, state, pod, nodeInfo); !fits { if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits {
if err != nil { if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
} }
@ -977,7 +996,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := addPod(p); err != nil { if err := addPod(p); err != nil {
return false, err return false, err
} }
fits, _, _ := g.podFitsOnNode(ctx, state, pod, nodeInfo) fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
if !fits { if !fits {
if err := removePod(p); err != nil { if err := removePod(p); err != nil {
return false, err return false, err

View File

@ -845,7 +845,7 @@ func TestFindFitAllError(t *testing.T) {
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
) )
_, nodeToStatusMap, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), &v1.Pod{}) _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -878,7 +878,7 @@ func TestFindFitSomeError(t *testing.T) {
) )
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, nodeToStatusMap, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), pod) _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), pod)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -957,7 +957,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot) cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot)
queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
_, _, err := scheduler.findNodesThatFit(context.Background(), framework.NewCycleState(), test.pod) _, _, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), test.pod)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1131,7 +1131,7 @@ func TestZeroRequest(t *testing.T) {
ctx := context.Background() ctx := context.Background()
state := framework.NewCycleState() state := framework.NewCycleState()
_, filteredNodesStatuses, err := scheduler.findNodesThatFit(ctx, state, test.pod) _, filteredNodesStatuses, err := scheduler.findNodesThatFitPod(ctx, state, test.pod)
if err != nil { if err != nil {
t.Fatalf("error filtering nodes: %+v", err) t.Fatalf("error filtering nodes: %+v", err)
} }
@ -2427,7 +2427,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
// Iterating over all nodes more than twice // Iterating over all nodes more than twice
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
nodesThatFit, _, err := g.findNodesThatFit(context.Background(), framework.NewCycleState(), &v1.Pod{}) nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }

View File

@ -132,15 +132,9 @@ func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister {
return &nodeInfoLister{snapshot: s} return &nodeInfoLister{snapshot: s}
} }
// ListNodes returns the list of nodes in the snapshot. // NumNodes returns the number of nodes in the snapshot.
func (s *Snapshot) ListNodes() []*v1.Node { func (s *Snapshot) NumNodes() int {
nodes := make([]*v1.Node, 0, len(s.NodeInfoList)) return len(s.NodeInfoList)
for _, n := range s.NodeInfoList {
if n.Node() != nil {
nodes = append(nodes, n.Node())
}
}
return nodes
} }
type podLister struct { type podLister struct {