From 63c1c68d0caf052e4e3babdc160dbc8e2647b764 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 9 Jan 2020 14:55:18 -0500 Subject: [PATCH] Remove direct use of Snapshot's data structures Signed-off-by: Aldo Culquicondor --- pkg/scheduler/core/generic_scheduler.go | 37 ++++++++++++--------- pkg/scheduler/nodeinfo/snapshot/snapshot.go | 12 ++----- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index b70341f4fd7..631e0288b33 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -178,7 +178,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } trace.Step("Snapshotting scheduler cache and node infos done") - if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { + if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } @@ -205,7 +205,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, - NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), + NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } @@ -299,19 +299,20 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } - if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, nil, nil, err + } + if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } - potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoList, fitError) + potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } - var ( - pdbs []*policy.PodDisruptionBudget - err error - ) + var pdbs []*policy.PodDisruptionBudget if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { @@ -435,11 +436,17 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor var filtered []*v1.Node filteredNodesStatuses := framework.NodeToStatusMap{} + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, nil, err + } if !g.framework.HasFilterPlugins() { - filtered = g.nodeInfoSnapshot.ListNodes() + filtered = make([]*v1.Node, len(allNodes)) + for i, ni := range allNodes { + filtered[i] = ni.Node() + } } else { - allNodes := len(g.nodeInfoSnapshot.NodeInfoList) - numNodesToFind := g.numFeasibleNodesToFind(int32(allNodes)) + numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) // Create filtered list with enough space to avoid growing it // and allow assigning. @@ -454,7 +461,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. - nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes] + nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] fits, status, err := g.podFitsOnNode(ctx, state, pod, nodeInfo) if err != nil { errCh.SendErrorWithCancel(err, cancel) @@ -479,9 +486,9 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor // Stops searching for more nodes once the configured number of feasible nodes // are found. - workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode) processedNodes := int(filteredLen) + len(filteredNodesStatuses) - g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % allNodes + g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) filtered = filtered[:filteredLen] if err := errCh.ReceiveError(); err != nil { @@ -653,7 +660,7 @@ func (g *genericScheduler) prioritizeNodes( if len(g.extenders) != 0 && nodes != nil { var mu sync.Mutex var wg sync.WaitGroup - combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) + combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if !g.extenders[i].IsInterested(pod) { continue diff --git a/pkg/scheduler/nodeinfo/snapshot/snapshot.go b/pkg/scheduler/nodeinfo/snapshot/snapshot.go index bb6eee6b676..94e2d50c897 100644 --- a/pkg/scheduler/nodeinfo/snapshot/snapshot.go +++ b/pkg/scheduler/nodeinfo/snapshot/snapshot.go @@ -132,15 +132,9 @@ func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister { return &nodeInfoLister{snapshot: s} } -// ListNodes returns the list of nodes in the snapshot. -func (s *Snapshot) ListNodes() []*v1.Node { - nodes := make([]*v1.Node, 0, len(s.NodeInfoList)) - for _, n := range s.NodeInfoList { - if n.Node() != nil { - nodes = append(nodes, n.Node()) - } - } - return nodes +// NumNodes returns the number of nodes in the snapshot. +func (s *Snapshot) NumNodes() int { + return len(s.NodeInfoList) } type podLister struct {