Merge pull request #84014 from ahg-g/ahg-tree

Make node tree order part of the snapshot
This commit is contained in:
Kubernetes Prow Robot 2019-10-18 15:21:37 -07:00 committed by GitHub
commit 70f68062ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 94 deletions

View File

@ -187,16 +187,15 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
trace.Step("Running prefilter plugins done")
numNodes := g.cache.NodeTree().NumNodes()
if numNodes == 0 {
return result, ErrNoNodesAvailable
}
if err := g.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshoting scheduler cache and node infos done")
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 {
return result, ErrNoNodesAvailable
}
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
if err != nil {
@ -213,7 +212,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: numNodes,
NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList),
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
@ -460,13 +459,13 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
if len(g.predicates) == 0 && !g.framework.HasFilterPlugins() {
filtered = g.nodeInfoSnapshot.ListNodes()
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
allNodes := int32(len(g.nodeInfoSnapshot.NodeInfoList))
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
errCh := util.NewErrorChannel()
var (
predicateResultLock sync.Mutex
filteredLen int32
@ -479,20 +478,17 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[i]
fits, failedPredicates, status, err := g.podFitsOnNode(
ctx,
state,
pod,
meta,
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
nodeInfo,
g.alwaysCheckAllPredicates,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
errCh.SendErrorWithCancel(err, cancel)
return
}
if fits {
@ -501,15 +497,15 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
filtered[length-1] = nodeInfo.Node()
}
} else {
predicateResultLock.Lock()
if !status.IsSuccess() {
filteredNodesStatuses[nodeName] = status
filteredNodesStatuses[nodeInfo.Node().Name] = status
}
if len(failedPredicates) != 0 {
failedPredicateMap[nodeName] = failedPredicates
failedPredicateMap[nodeInfo.Node().Name] = failedPredicates
}
predicateResultLock.Unlock()
}
@ -520,8 +516,8 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs)
if err := errCh.ReceiveError(); err != nil {
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
}
}

View File

@ -74,7 +74,7 @@ type schedulerCache struct {
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *NodeTree
nodeTree *nodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
@ -238,6 +238,17 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodei
}
}
}
// Take a snapshot of the nodes order in the tree
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
return nil
}
@ -516,7 +527,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
}
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.AddNode(node)
cache.nodeTree.addNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
}
@ -534,7 +545,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
}
cache.moveNodeInfoToHead(newNode.Name)
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.nodeTree.updateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
return n.info.SetNode(newNode)
}
@ -560,7 +571,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.RemoveNode(node); err != nil {
if err := cache.nodeTree.removeNode(node); err != nil {
return err
}
cache.removeNodeImageStates(node)
@ -688,10 +699,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}
// GetNodeInfo returns cached data for the node name.
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()

View File

@ -1066,7 +1066,7 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in internalcache.", node.Name)
}
if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
}
@ -1109,7 +1109,7 @@ func TestNodeOperators(t *testing.T) {
t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
}
@ -1120,7 +1120,7 @@ func TestNodeOperators(t *testing.T) {
}
// Check nodeTree after remove. The node should be removed from the nodeTree even if there are
// still pods on it.
if cache.nodeTree.NumNodes() != 0 || cache.nodeTree.Next() != "" {
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
}

View File

@ -103,9 +103,6 @@ func (c *Cache) Snapshot() *internalcache.Snapshot {
return &internalcache.Snapshot{}
}
// NodeTree is a fake method for testing.
func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }
// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
@ -120,3 +117,8 @@ func (c *Cache) ListNodes() []*v1.Node {
func (c *Cache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
return nil, nil
}
// NumNodes is a fake method for testing.
func (c *Cache) NumNodes() int {
return 0
}

View File

@ -119,9 +119,6 @@ type Cache interface {
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
// NodeTree returns a node tree structure
NodeTree() *NodeTree
}
// Snapshot is a snapshot of cache state

View File

@ -18,22 +18,21 @@ package cache
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/klog"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
type NodeTree struct {
// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller.
// It is used only by schedulerCache, and should stay as such.
type nodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
numNodes int
mu sync.RWMutex
}
// nodeArray is a struct that has nodes that are in a zone.
@ -58,8 +57,8 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) {
}
// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
func newNodeTree(nodes []*v1.Node) *nodeTree {
nt := &nodeTree{
tree: make(map[string]*nodeArray),
}
for _, n := range nodes {
@ -68,15 +67,9 @@ func newNodeTree(nodes []*v1.Node) *NodeTree {
return nt
}
// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
func (nt *NodeTree) addNode(n *v1.Node) {
func (nt *nodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
@ -94,14 +87,8 @@ func (nt *NodeTree) addNode(n *v1.Node) {
nt.numNodes++
}
// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
return nt.removeNode(n)
}
func (nt *NodeTree) removeNode(n *v1.Node) error {
// removeNode removes a node from the NodeTree.
func (nt *nodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
@ -122,7 +109,7 @@ func (nt *NodeTree) removeNode(n *v1.Node) error {
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *NodeTree) removeZone(zone string) {
func (nt *nodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
@ -132,8 +119,8 @@ func (nt *NodeTree) removeZone(zone string) {
}
}
// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
// updateNode updates a node in the NodeTree.
func (nt *nodeTree) updateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
@ -144,24 +131,20 @@ func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
if oldZone == newZone {
return
}
nt.mu.Lock()
defer nt.mu.Unlock()
nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}
func (nt *NodeTree) resetExhausted() {
func (nt *nodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
func (nt *nodeTree) next() string {
if len(nt.zones) == 0 {
return ""
}
@ -185,10 +168,3 @@ func (nt *NodeTree) Next() string {
}
}
}
// NumNodes returns the number of nodes.
func (nt *NodeTree) NumNodes() int {
nt.mu.RLock()
defer nt.mu.RUnlock()
return nt.numNodes
}

View File

@ -110,23 +110,23 @@ var allNodes = []*v1.Node{
},
}}
func verifyNodeTree(t *testing.T, nt *NodeTree, expectedTree map[string]*nodeArray) {
func verifyNodeTree(t *testing.T, nt *nodeTree, expectedTree map[string]*nodeArray) {
expectedNumNodes := int(0)
for _, na := range expectedTree {
expectedNumNodes += len(na.nodes)
}
if numNodes := nt.NumNodes(); numNodes != expectedNumNodes {
t.Errorf("unexpected NodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, numNodes)
if numNodes := nt.numNodes; numNodes != expectedNumNodes {
t.Errorf("unexpected nodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, numNodes)
}
if !reflect.DeepEqual(nt.tree, expectedTree) {
t.Errorf("The node tree is not the same as expected. Expected: %v, Got: %v", expectedTree, nt.tree)
}
if len(nt.zones) != len(expectedTree) {
t.Errorf("Number of zones in NodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones))
t.Errorf("Number of zones in nodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones))
}
for _, z := range nt.zones {
if _, ok := expectedTree[z]; !ok {
t.Errorf("zone %v is not expected to exist in NodeTree.zones", z)
t.Errorf("zone %v is not expected to exist in nodeTree.zones", z)
}
}
}
@ -170,7 +170,7 @@ func TestNodeTree_AddNode(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(nil)
for _, n := range test.nodesToAdd {
nt.AddNode(n)
nt.addNode(n)
}
verifyNodeTree(t, nt, test.expectedTree)
})
@ -227,7 +227,7 @@ func TestNodeTree_RemoveNode(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.existingNodes)
for _, n := range test.nodesToRemove {
err := nt.RemoveNode(n)
err := nt.removeNode(n)
if test.expectError == (err == nil) {
t.Errorf("unexpected returned error value: %v", err)
}
@ -312,7 +312,7 @@ func TestNodeTree_UpdateNode(t *testing.T) {
if oldNode == nil {
oldNode = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "nonexisting-node"}}
}
nt.UpdateNode(oldNode, test.nodeToUpdate)
nt.updateNode(oldNode, test.nodeToUpdate)
verifyNodeTree(t, nt, test.expectedTree)
})
}
@ -357,7 +357,7 @@ func TestNodeTree_Next(t *testing.T) {
var output []string
for i := 0; i < test.numRuns; i++ {
output = append(output, nt.Next())
output = append(output, nt.next())
}
if !reflect.DeepEqual(output, test.expectedOutput) {
t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output)
@ -423,18 +423,18 @@ func TestNodeTreeMultiOperations(t *testing.T) {
if addIndex >= len(test.nodesToAdd) {
t.Error("more add operations than nodesToAdd")
} else {
nt.AddNode(test.nodesToAdd[addIndex])
nt.addNode(test.nodesToAdd[addIndex])
addIndex++
}
case "remove":
if removeIndex >= len(test.nodesToRemove) {
t.Error("more remove operations than nodesToRemove")
} else {
nt.RemoveNode(test.nodesToRemove[removeIndex])
nt.removeNode(test.nodesToRemove[removeIndex])
removeIndex++
}
case "next":
output = append(output, nt.Next())
output = append(output, nt.next())
default:
t.Errorf("unknow operation: %v", op)
}

View File

@ -20,12 +20,14 @@ import (
v1 "k8s.io/api/core/v1"
)
// Snapshot is a snapshot of cache NodeInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its
// operations in that cycle.
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {
// NodeInfoMap a map of node name to a snapshot of its NodeInfo.
NodeInfoMap map[string]*NodeInfo
Generation int64
// NodeInfoList is the list of nodes as ordered in the cache's nodeTree.
NodeInfoList []*NodeInfo
Generation int64
}
// NewSnapshot initializes a Snapshot struct and returns it.
@ -38,7 +40,7 @@ func NewSnapshot() *Snapshot {
// ListNodes returns the list of nodes in the snapshot.
func (s *Snapshot) ListNodes() []*v1.Node {
nodes := make([]*v1.Node, 0, len(s.NodeInfoMap))
for _, n := range s.NodeInfoMap {
for _, n := range s.NodeInfoList {
if n != nil && n.node != nil {
nodes = append(nodes, n.node)
}