From 63d7733e988776355306e21947c43cb2f4fb8896 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Fri, 18 Oct 2019 12:51:46 -0400 Subject: [PATCH] create an ordered list of nodes instead of iterating over the tree --- pkg/scheduler/core/generic_scheduler.go | 34 +++++------ pkg/scheduler/internal/cache/cache.go | 23 +++++--- pkg/scheduler/internal/cache/cache_test.go | 6 +- .../internal/cache/fake/fake_cache.go | 8 ++- pkg/scheduler/internal/cache/interface.go | 3 - pkg/scheduler/internal/cache/node_tree.go | 58 ++++++------------- .../internal/cache/node_tree_test.go | 24 ++++---- pkg/scheduler/nodeinfo/snapshot.go | 12 ++-- 8 files changed, 74 insertions(+), 94 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index c769a461352..888342aec0f 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -187,16 +187,15 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } trace.Step("Running prefilter plugins done") - numNodes := g.cache.NodeTree().NumNodes() - if numNodes == 0 { - return result, ErrNoNodesAvailable - } - if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshoting scheduler cache and node infos done") + if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { + return result, ErrNoNodesAvailable + } + startPredicateEvalTime := time.Now() filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) if err != nil { @@ -213,7 +212,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, - NumAllNodes: numNodes, + NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), FailedPredicates: failedPredicateMap, FilteredNodesStatuses: filteredNodesStatuses, } @@ -460,13 +459,13 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor if len(g.predicates) == 0 && !g.framework.HasFilterPlugins() { filtered = g.nodeInfoSnapshot.ListNodes() } else { - allNodes := int32(g.cache.NodeTree().NumNodes()) + allNodes := int32(len(g.nodeInfoSnapshot.NodeInfoList)) numNodesToFind := g.numFeasibleNodesToFind(allNodes) // Create filtered list with enough space to avoid growing it // and allow assigning. filtered = make([]*v1.Node, numNodesToFind) - errs := errors.MessageCountMap{} + errCh := util.NewErrorChannel() var ( predicateResultLock sync.Mutex filteredLen int32 @@ -479,20 +478,17 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) checkNode := func(i int) { - nodeName := g.cache.NodeTree().Next() - + nodeInfo := g.nodeInfoSnapshot.NodeInfoList[i] fits, failedPredicates, status, err := g.podFitsOnNode( ctx, state, pod, meta, - g.nodeInfoSnapshot.NodeInfoMap[nodeName], + nodeInfo, g.alwaysCheckAllPredicates, ) if err != nil { - predicateResultLock.Lock() - errs[err.Error()]++ - predicateResultLock.Unlock() + errCh.SendErrorWithCancel(err, cancel) return } if fits { @@ -501,15 +497,15 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor cancel() atomic.AddInt32(&filteredLen, -1) } else { - filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node() + filtered[length-1] = nodeInfo.Node() } } else { predicateResultLock.Lock() if !status.IsSuccess() { - filteredNodesStatuses[nodeName] = status + filteredNodesStatuses[nodeInfo.Node().Name] = status } if len(failedPredicates) != 0 { - failedPredicateMap[nodeName] = failedPredicates + failedPredicateMap[nodeInfo.Node().Name] = failedPredicates } predicateResultLock.Unlock() } @@ -520,8 +516,8 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) filtered = filtered[:filteredLen] - if len(errs) > 0 { - return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs) + if err := errCh.ReceiveError(); err != nil { + return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err } } diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 08b4df3b59f..d27da108672 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -74,7 +74,7 @@ type schedulerCache struct { // headNode points to the most recently updated NodeInfo in "nodes". It is the // head of the linked list. headNode *nodeInfoListItem - nodeTree *NodeTree + nodeTree *nodeTree // A map from image name to its imageState. imageStates map[string]*imageState } @@ -238,6 +238,17 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodei } } } + + // Take a snapshot of the nodes order in the tree + nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) + for i := 0; i < cache.nodeTree.numNodes; i++ { + nodeName := cache.nodeTree.next() + if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { + nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) + } else { + klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) + } + } return nil } @@ -516,7 +527,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { } cache.moveNodeInfoToHead(node.Name) - cache.nodeTree.AddNode(node) + cache.nodeTree.addNode(node) cache.addNodeImageStates(node, n.info) return n.info.SetNode(node) } @@ -534,7 +545,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { } cache.moveNodeInfoToHead(newNode.Name) - cache.nodeTree.UpdateNode(oldNode, newNode) + cache.nodeTree.updateNode(oldNode, newNode) cache.addNodeImageStates(newNode, n.info) return n.info.SetNode(newNode) } @@ -560,7 +571,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { cache.moveNodeInfoToHead(node.Name) } - if err := cache.nodeTree.RemoveNode(node); err != nil { + if err := cache.nodeTree.removeNode(node); err != nil { return err } cache.removeNodeImageStates(node) @@ -688,10 +699,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error { return nil } -func (cache *schedulerCache) NodeTree() *NodeTree { - return cache.nodeTree -} - // GetNodeInfo returns cached data for the node name. func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) { cache.mu.RLock() diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index f243efb3cfd..538fe223541 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1066,7 +1066,7 @@ func TestNodeOperators(t *testing.T) { if !found { t.Errorf("Failed to find node %v in internalcache.", node.Name) } - if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name { + if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) } @@ -1109,7 +1109,7 @@ func TestNodeOperators(t *testing.T) { t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) } // Check nodeTree after update - if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name { + if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name) } @@ -1120,7 +1120,7 @@ func TestNodeOperators(t *testing.T) { } // Check nodeTree after remove. The node should be removed from the nodeTree even if there are // still pods on it. - if cache.nodeTree.NumNodes() != 0 || cache.nodeTree.Next() != "" { + if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) } } diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 5350a2ec776..94feced6d13 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -103,9 +103,6 @@ func (c *Cache) Snapshot() *internalcache.Snapshot { return &internalcache.Snapshot{} } -// NodeTree is a fake method for testing. -func (c *Cache) NodeTree() *internalcache.NodeTree { return nil } - // GetNodeInfo is a fake method for testing. func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) { return nil, nil @@ -120,3 +117,8 @@ func (c *Cache) ListNodes() []*v1.Node { func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) { return nil, nil } + +// NumNodes is a fake method for testing. +func (c *Cache) NumNodes() int { + return 0 +} diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index f5c973fba48..bc84de148f7 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -119,9 +119,6 @@ type Cache interface { // Snapshot takes a snapshot on current cache Snapshot() *Snapshot - - // NodeTree returns a node tree structure - NodeTree() *NodeTree } // Snapshot is a snapshot of cache state diff --git a/pkg/scheduler/internal/cache/node_tree.go b/pkg/scheduler/internal/cache/node_tree.go index de53feef487..4a81182f7eb 100644 --- a/pkg/scheduler/internal/cache/node_tree.go +++ b/pkg/scheduler/internal/cache/node_tree.go @@ -18,22 +18,21 @@ package cache import ( "fmt" - "sync" "k8s.io/api/core/v1" - utilnode "k8s.io/kubernetes/pkg/util/node" - "k8s.io/klog" + utilnode "k8s.io/kubernetes/pkg/util/node" ) -// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are +// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are // keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names. -type NodeTree struct { +// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller. +// It is used only by schedulerCache, and should stay as such. +type nodeTree struct { tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone. zones []string // a list of all the zones in the tree (keys) zoneIndex int numNodes int - mu sync.RWMutex } // nodeArray is a struct that has nodes that are in a zone. @@ -58,8 +57,8 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) { } // newNodeTree creates a NodeTree from nodes. -func newNodeTree(nodes []*v1.Node) *NodeTree { - nt := &NodeTree{ +func newNodeTree(nodes []*v1.Node) *nodeTree { + nt := &nodeTree{ tree: make(map[string]*nodeArray), } for _, n := range nodes { @@ -68,15 +67,9 @@ func newNodeTree(nodes []*v1.Node) *NodeTree { return nt } -// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node +// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node // is added to the array of nodes in that zone. -func (nt *NodeTree) AddNode(n *v1.Node) { - nt.mu.Lock() - defer nt.mu.Unlock() - nt.addNode(n) -} - -func (nt *NodeTree) addNode(n *v1.Node) { +func (nt *nodeTree) addNode(n *v1.Node) { zone := utilnode.GetZoneKey(n) if na, ok := nt.tree[zone]; ok { for _, nodeName := range na.nodes { @@ -94,14 +87,8 @@ func (nt *NodeTree) addNode(n *v1.Node) { nt.numNodes++ } -// RemoveNode removes a node from the NodeTree. -func (nt *NodeTree) RemoveNode(n *v1.Node) error { - nt.mu.Lock() - defer nt.mu.Unlock() - return nt.removeNode(n) -} - -func (nt *NodeTree) removeNode(n *v1.Node) error { +// removeNode removes a node from the NodeTree. +func (nt *nodeTree) removeNode(n *v1.Node) error { zone := utilnode.GetZoneKey(n) if na, ok := nt.tree[zone]; ok { for i, nodeName := range na.nodes { @@ -122,7 +109,7 @@ func (nt *NodeTree) removeNode(n *v1.Node) error { // removeZone removes a zone from tree. // This function must be called while writer locks are hold. -func (nt *NodeTree) removeZone(zone string) { +func (nt *nodeTree) removeZone(zone string) { delete(nt.tree, zone) for i, z := range nt.zones { if z == zone { @@ -132,8 +119,8 @@ func (nt *NodeTree) removeZone(zone string) { } } -// UpdateNode updates a node in the NodeTree. -func (nt *NodeTree) UpdateNode(old, new *v1.Node) { +// updateNode updates a node in the NodeTree. +func (nt *nodeTree) updateNode(old, new *v1.Node) { var oldZone string if old != nil { oldZone = utilnode.GetZoneKey(old) @@ -144,24 +131,20 @@ func (nt *NodeTree) UpdateNode(old, new *v1.Node) { if oldZone == newZone { return } - nt.mu.Lock() - defer nt.mu.Unlock() nt.removeNode(old) // No error checking. We ignore whether the old node exists or not. nt.addNode(new) } -func (nt *NodeTree) resetExhausted() { +func (nt *nodeTree) resetExhausted() { for _, na := range nt.tree { na.lastIndex = 0 } nt.zoneIndex = 0 } -// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates +// next returns the name of the next node. NodeTree iterates over zones and in each zone iterates // over nodes in a round robin fashion. -func (nt *NodeTree) Next() string { - nt.mu.Lock() - defer nt.mu.Unlock() +func (nt *nodeTree) next() string { if len(nt.zones) == 0 { return "" } @@ -185,10 +168,3 @@ func (nt *NodeTree) Next() string { } } } - -// NumNodes returns the number of nodes. -func (nt *NodeTree) NumNodes() int { - nt.mu.RLock() - defer nt.mu.RUnlock() - return nt.numNodes -} diff --git a/pkg/scheduler/internal/cache/node_tree_test.go b/pkg/scheduler/internal/cache/node_tree_test.go index e8cb35ba78a..726e3158a24 100644 --- a/pkg/scheduler/internal/cache/node_tree_test.go +++ b/pkg/scheduler/internal/cache/node_tree_test.go @@ -110,23 +110,23 @@ var allNodes = []*v1.Node{ }, }} -func verifyNodeTree(t *testing.T, nt *NodeTree, expectedTree map[string]*nodeArray) { +func verifyNodeTree(t *testing.T, nt *nodeTree, expectedTree map[string]*nodeArray) { expectedNumNodes := int(0) for _, na := range expectedTree { expectedNumNodes += len(na.nodes) } - if numNodes := nt.NumNodes(); numNodes != expectedNumNodes { - t.Errorf("unexpected NodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, numNodes) + if numNodes := nt.numNodes; numNodes != expectedNumNodes { + t.Errorf("unexpected nodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, numNodes) } if !reflect.DeepEqual(nt.tree, expectedTree) { t.Errorf("The node tree is not the same as expected. Expected: %v, Got: %v", expectedTree, nt.tree) } if len(nt.zones) != len(expectedTree) { - t.Errorf("Number of zones in NodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones)) + t.Errorf("Number of zones in nodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones)) } for _, z := range nt.zones { if _, ok := expectedTree[z]; !ok { - t.Errorf("zone %v is not expected to exist in NodeTree.zones", z) + t.Errorf("zone %v is not expected to exist in nodeTree.zones", z) } } } @@ -170,7 +170,7 @@ func TestNodeTree_AddNode(t *testing.T) { t.Run(test.name, func(t *testing.T) { nt := newNodeTree(nil) for _, n := range test.nodesToAdd { - nt.AddNode(n) + nt.addNode(n) } verifyNodeTree(t, nt, test.expectedTree) }) @@ -227,7 +227,7 @@ func TestNodeTree_RemoveNode(t *testing.T) { t.Run(test.name, func(t *testing.T) { nt := newNodeTree(test.existingNodes) for _, n := range test.nodesToRemove { - err := nt.RemoveNode(n) + err := nt.removeNode(n) if test.expectError == (err == nil) { t.Errorf("unexpected returned error value: %v", err) } @@ -312,7 +312,7 @@ func TestNodeTree_UpdateNode(t *testing.T) { if oldNode == nil { oldNode = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "nonexisting-node"}} } - nt.UpdateNode(oldNode, test.nodeToUpdate) + nt.updateNode(oldNode, test.nodeToUpdate) verifyNodeTree(t, nt, test.expectedTree) }) } @@ -357,7 +357,7 @@ func TestNodeTree_Next(t *testing.T) { var output []string for i := 0; i < test.numRuns; i++ { - output = append(output, nt.Next()) + output = append(output, nt.next()) } if !reflect.DeepEqual(output, test.expectedOutput) { t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output) @@ -423,18 +423,18 @@ func TestNodeTreeMultiOperations(t *testing.T) { if addIndex >= len(test.nodesToAdd) { t.Error("more add operations than nodesToAdd") } else { - nt.AddNode(test.nodesToAdd[addIndex]) + nt.addNode(test.nodesToAdd[addIndex]) addIndex++ } case "remove": if removeIndex >= len(test.nodesToRemove) { t.Error("more remove operations than nodesToRemove") } else { - nt.RemoveNode(test.nodesToRemove[removeIndex]) + nt.removeNode(test.nodesToRemove[removeIndex]) removeIndex++ } case "next": - output = append(output, nt.Next()) + output = append(output, nt.next()) default: t.Errorf("unknow operation: %v", op) } diff --git a/pkg/scheduler/nodeinfo/snapshot.go b/pkg/scheduler/nodeinfo/snapshot.go index c74d1fc5e38..cebac7f20c5 100644 --- a/pkg/scheduler/nodeinfo/snapshot.go +++ b/pkg/scheduler/nodeinfo/snapshot.go @@ -20,12 +20,14 @@ import ( v1 "k8s.io/api/core/v1" ) -// Snapshot is a snapshot of cache NodeInfo. The scheduler takes a -// snapshot at the beginning of each scheduling cycle and uses it for its -// operations in that cycle. +// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a +// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle. type Snapshot struct { + // NodeInfoMap a map of node name to a snapshot of its NodeInfo. NodeInfoMap map[string]*NodeInfo - Generation int64 + // NodeInfoList is the list of nodes as ordered in the cache's nodeTree. + NodeInfoList []*NodeInfo + Generation int64 } // NewSnapshot initializes a Snapshot struct and returns it. @@ -38,7 +40,7 @@ func NewSnapshot() *Snapshot { // ListNodes returns the list of nodes in the snapshot. func (s *Snapshot) ListNodes() []*v1.Node { nodes := make([]*v1.Node, 0, len(s.NodeInfoMap)) - for _, n := range s.NodeInfoMap { + for _, n := range s.NodeInfoList { if n != nil && n.node != nil { nodes = append(nodes, n.node) }