Merge pull request #85738 from ahg-g/ahg-15k

Optimize UpdateNodeInfoSnapshot
This commit is contained in:
Kubernetes Prow Robot 2019-11-29 13:15:03 -08:00 committed by GitHub
commit aa67744438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 218 additions and 33 deletions

View File

@ -209,6 +209,14 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh
// Get the last generation of the snapshot. // Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.Generation snapshotGeneration := nodeSnapshot.Generation
// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
// or removed from the cache.
updateAllLists := false
// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false
// Start from the head of the NodeInfo doubly linked list and update snapshot // Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot. // of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next { for node := cache.headNode; node != nil; node = node.next {
@ -221,7 +229,22 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh
node.info.TransientInfo.ResetTransientSchedulerInfo() node.info.TransientInfo.ResetTransientSchedulerInfo()
} }
if np := node.info.Node(); np != nil { if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() existing, ok := nodeSnapshot.NodeInfoMap[np.Name]
if !ok {
updateAllLists = true
existing = &schedulernodeinfo.NodeInfo{}
nodeSnapshot.NodeInfoMap[np.Name] = existing
}
clone := node.info.Clone()
// We track nodes that have pods with affinity, here we check if this node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
if (len(existing.PodsWithAffinity()) > 0) != (len(clone.PodsWithAffinity()) > 0) {
updateNodesHavePodsWithAffinity = true
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
} }
} }
// Update the snapshot generation with the latest NodeInfo generation. // Update the snapshot generation with the latest NodeInfo generation.
@ -230,28 +253,67 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh
} }
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap { cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
if _, ok := cache.nodes[name]; !ok { updateAllLists = true
delete(nodeSnapshot.NodeInfoMap, name)
}
}
} }
// Take a snapshot of the nodes order in the tree if updateAllLists || updateNodesHavePodsWithAffinity {
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
if len(nodeSnapshot.NodeInfoList) != len(nodeSnapshot.NodeInfoMap) {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of NodeInfoMap=%v "+
"length of nodes in cache=%v, length of nodes in tree=%v"+
", trying to recover",
len(nodeSnapshot.NodeInfoList), len(nodeSnapshot.NodeInfoMap),
len(cache.nodes), cache.nodeTree.numNodes)
klog.Error(errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
return fmt.Errorf(errMsg)
}
return nil
}
func (cache *schedulerCache) updateNodeInfoSnapshotList(nodeSnapshot *nodeinfosnapshot.Snapshot, updateAll bool) {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ { if updateAll {
nodeName := cache.nodeTree.next() // Take a snapshot of the nodes order in the tree
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
if len(n.PodsWithAffinity()) > 0 {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
} else {
for _, n := range nodeSnapshot.NodeInfoList {
if len(n.PodsWithAffinity()) > 0 { if len(n.PodsWithAffinity()) > 0 {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n) nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n)
} }
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
} }
} }
return nil }
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
func (cache *schedulerCache) removeDeletedNodesFromSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) {
toDelete := len(nodeSnapshot.NodeInfoMap) - len(cache.nodes)
for name := range nodeSnapshot.NodeInfoMap {
if toDelete <= 0 {
break
}
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
toDelete--
}
}
} }
func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
@ -542,6 +604,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
if !ok { if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[newNode.Name] = n cache.nodes[newNode.Name] = n
cache.nodeTree.addNode(newNode)
} else { } else {
cache.removeNodeImageStates(n.info.Node()) cache.removeNodeImageStates(n.info.Node())
} }

View File

@ -1170,6 +1170,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
} }
pods = append(pods, pod) pods = append(pods, pod)
} }
// Create a few pods as updated versions of the above pods. // Create a few pods as updated versions of the above pods.
updatedPods := []*v1.Pod{} updatedPods := []*v1.Pod{}
for _, p := range pods { for _, p := range pods {
@ -1179,38 +1180,76 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
updatedPods = append(updatedPods, updatedPod) updatedPods = append(updatedPods, updatedPod)
} }
// Add a couple of pods with affinity, on the first and seconds nodes.
podsWithAffinity := []*v1.Pod{}
for i := 0; i < 2; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod%v", i),
Namespace: "test-ns",
UID: types.UID(fmt.Sprintf("test-puid%v", i)),
},
Spec: v1.PodSpec{
NodeName: fmt.Sprintf("test-node%v", i),
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{},
},
},
}
podsWithAffinity = append(podsWithAffinity, pod)
}
var cache *schedulerCache var cache *schedulerCache
var snapshot *nodeinfosnapshot.Snapshot var snapshot *nodeinfosnapshot.Snapshot
type operation = func() type operation = func()
addNode := func(i int) operation { addNode := func(i int) operation {
return func() { return func() {
cache.AddNode(nodes[i]) if err := cache.AddNode(nodes[i]); err != nil {
t.Error(err)
}
} }
} }
removeNode := func(i int) operation { removeNode := func(i int) operation {
return func() { return func() {
cache.RemoveNode(nodes[i]) if err := cache.RemoveNode(nodes[i]); err != nil {
t.Error(err)
}
} }
} }
updateNode := func(i int) operation { updateNode := func(i int) operation {
return func() { return func() {
cache.UpdateNode(nodes[i], updatedNodes[i]) if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil {
t.Error(err)
}
} }
} }
addPod := func(i int) operation { addPod := func(i int) operation {
return func() { return func() {
cache.AddPod(pods[i]) if err := cache.AddPod(pods[i]); err != nil {
t.Error(err)
}
} }
} }
removePod := func(i int) operation { addPodWithAffinity := func(i int) operation {
return func() { return func() {
cache.RemovePod(pods[i]) if err := cache.AddPod(podsWithAffinity[i]); err != nil {
t.Error(err)
}
}
}
removePodWithAffinity := func(i int) operation {
return func() {
if err := cache.RemovePod(podsWithAffinity[i]); err != nil {
t.Error(err)
}
} }
} }
updatePod := func(i int) operation { updatePod := func(i int) operation {
return func() { return func() {
cache.UpdatePod(pods[i], updatedPods[i]) if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
t.Error(err)
}
} }
} }
updateSnapshot := func() operation { updateSnapshot := func() operation {
@ -1223,9 +1262,10 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
} }
tests := []struct { tests := []struct {
name string name string
operations []operation operations []operation
expected []*v1.Node expected []*v1.Node
expectedHavePodsWithAffinity int
}{ }{
{ {
name: "Empty cache", name: "Empty cache",
@ -1244,6 +1284,13 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}, },
expected: []*v1.Node{nodes[1]}, expected: []*v1.Node{nodes[1]},
}, },
{
name: "Add node and remove it in the same cycle, add it again",
operations: []operation{
addNode(1), updateSnapshot(), addNode(2), removeNode(1),
},
expected: []*v1.Node{nodes[2]},
},
{ {
name: "Add a few nodes, and snapshot in the middle", name: "Add a few nodes, and snapshot in the middle",
operations: []operation{ operations: []operation{
@ -1262,7 +1309,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
{ {
name: "Remove non-existing node", name: "Remove non-existing node",
operations: []operation{ operations: []operation{
addNode(0), addNode(1), updateSnapshot(), removeNode(8), addNode(0), addNode(1), updateSnapshot(),
}, },
expected: []*v1.Node{nodes[1], nodes[0]}, expected: []*v1.Node{nodes[1], nodes[0]},
}, },
@ -1324,10 +1371,34 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
{ {
name: "Remove pod from non-existing node", name: "Remove pod from non-existing node",
operations: []operation{ operations: []operation{
addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3), addNode(0), addPod(0), addNode(2), updateSnapshot(),
}, },
expected: []*v1.Node{nodes[2], nodes[0]}, expected: []*v1.Node{nodes[2], nodes[0]},
}, },
{
name: "Add Pods with affinity",
operations: []operation{
addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 1,
},
{
name: "Add multiple nodes with pods with affinity",
operations: []operation{
addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), addPodWithAffinity(1), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 2,
},
{
name: "Add then Remove pods with affinity",
operations: []operation{
addNode(0), addNode(1), addPodWithAffinity(0), updateSnapshot(), removePodWithAffinity(0), updateSnapshot(),
},
expected: []*v1.Node{nodes[0], nodes[1]},
expectedHavePodsWithAffinity: 0,
},
} }
for _, test := range tests { for _, test := range tests {
@ -1355,8 +1426,15 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
} }
// Check number of nodes with pods with affinity
if len(snapshot.HavePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity {
t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.HavePodsWithAffinityNodeInfoList))
}
// Always update the snapshot at the end of operations and compare it. // Always update the snapshot at the end of operations and compare it.
cache.UpdateNodeInfoSnapshot(snapshot) if err := cache.UpdateNodeInfoSnapshot(snapshot); err != nil {
t.Error(err)
}
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil { if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
t.Error(err) t.Error(err)
} }
@ -1365,14 +1443,49 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
} }
func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *nodeinfosnapshot.Snapshot) error { func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *nodeinfosnapshot.Snapshot) error {
// Compare the map.
if len(snapshot.NodeInfoMap) != len(cache.nodes) { if len(snapshot.NodeInfoMap) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap))
} }
for name, ni := range cache.nodes { for name, ni := range cache.nodes {
if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) { if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) {
return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name]) return fmt.Errorf("unexpected node info for node %q. Expected: %v, got: %v", name, ni.info, snapshot.NodeInfoMap[name])
} }
} }
// Compare the lists.
if len(snapshot.NodeInfoList) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoList))
}
expectedNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
expectedHavePodsWithAffinityNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
if n := snapshot.NodeInfoMap[nodeName]; n != nil {
expectedNodeInfoList = append(expectedNodeInfoList, n)
if len(n.PodsWithAffinity()) > 0 {
expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
}
} else {
return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName)
}
}
for i, expected := range expectedNodeInfoList {
got := snapshot.NodeInfoList[i]
if expected != got {
return fmt.Errorf("unexpected NodeInfo pointer in NodeInfoList. Expected: %p, got: %p", expected, got)
}
}
for i, expected := range expectedHavePodsWithAffinityNodeInfoList {
got := snapshot.HavePodsWithAffinityNodeInfoList[i]
if expected != got {
return fmt.Errorf("unexpected NodeInfo pointer in HavePodsWithAffinityNodeInfoList. Expected: %p, got: %p", expected, got)
}
}
return nil return nil
} }

View File

@ -548,13 +548,23 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
n.UpdateUsedPorts(pod, false) n.UpdateUsedPorts(pod, false)
n.generation = nextGeneration() n.generation = nextGeneration()
n.resetSlicesIfEmpty()
return nil return nil
} }
} }
return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
} }
// resets the slices to nil so that we can do DeepEqual in unit tests.
func (n *NodeInfo) resetSlicesIfEmpty() {
if len(n.podsWithAffinity) == 0 {
n.podsWithAffinity = nil
}
if len(n.pods) == 0 {
n.pods = nil
}
}
func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) { func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) {
resPtr := &res resPtr := &res
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {

View File

@ -62,7 +62,6 @@ var (
// emptyFramework is an empty framework used in tests. // emptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition. // Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil) emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil)
emptySnapshot = nodeinfosnapshot.NewEmptySnapshot()
) )
type fakeBinder struct { type fakeBinder struct {
@ -652,7 +651,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{}, []priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, nodeinfosnapshot.NewEmptySnapshot(),
emptyFramework, emptyFramework,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,
@ -703,7 +702,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{}, []priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, nodeinfosnapshot.NewEmptySnapshot(),
emptyFramework, emptyFramework,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,