diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index d740c0625d9..8e361572b89 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -209,6 +209,14 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh // Get the last generation of the snapshot. snapshotGeneration := nodeSnapshot.Generation + // NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added + // or removed from the cache. + updateAllLists := false + // HavePodsWithAffinityNodeInfoList must be re-created if a node changed its + // status from having pods with affinity to NOT having pods with affinity or the other + // way around. + updateNodesHavePodsWithAffinity := false + // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. for node := cache.headNode; node != nil; node = node.next { @@ -221,7 +229,22 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh node.info.TransientInfo.ResetTransientSchedulerInfo() } if np := node.info.Node(); np != nil { - nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() + existing, ok := nodeSnapshot.NodeInfoMap[np.Name] + if !ok { + updateAllLists = true + existing = &schedulernodeinfo.NodeInfo{} + nodeSnapshot.NodeInfoMap[np.Name] = existing + } + clone := node.info.Clone() + // We track nodes that have pods with affinity, here we check if this node changed its + // status from having pods with affinity to NOT having pods with affinity or the other + // way around. + if (len(existing.PodsWithAffinity()) > 0) != (len(clone.PodsWithAffinity()) > 0) { + updateNodesHavePodsWithAffinity = true + } + // We need to preserve the original pointer of the NodeInfo struct since it + // is used in the NodeInfoList, which we may not update. + *existing = *clone } } // Update the snapshot generation with the latest NodeInfo generation. @@ -230,28 +253,67 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh } if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { - for name := range nodeSnapshot.NodeInfoMap { - if _, ok := cache.nodes[name]; !ok { - delete(nodeSnapshot.NodeInfoMap, name) - } - } + cache.removeDeletedNodesFromSnapshot(nodeSnapshot) + updateAllLists = true } - // Take a snapshot of the nodes order in the tree - nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) + if updateAllLists || updateNodesHavePodsWithAffinity { + cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) + } + + if len(nodeSnapshot.NodeInfoList) != len(nodeSnapshot.NodeInfoMap) { + errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of NodeInfoMap=%v "+ + "length of nodes in cache=%v, length of nodes in tree=%v"+ + ", trying to recover", + len(nodeSnapshot.NodeInfoList), len(nodeSnapshot.NodeInfoMap), + len(cache.nodes), cache.nodeTree.numNodes) + klog.Error(errMsg) + // We will try to recover by re-creating the lists for the next scheduling cycle, but still return an + // error to surface the problem, the error will likely cause a failure to the current scheduling cycle. + cache.updateNodeInfoSnapshotList(nodeSnapshot, true) + return fmt.Errorf(errMsg) + } + + return nil +} + +func (cache *schedulerCache) updateNodeInfoSnapshotList(nodeSnapshot *nodeinfosnapshot.Snapshot, updateAll bool) { nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) - for i := 0; i < cache.nodeTree.numNodes; i++ { - nodeName := cache.nodeTree.next() - if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { - nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) + if updateAll { + // Take a snapshot of the nodes order in the tree + nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) + for i := 0; i < cache.nodeTree.numNodes; i++ { + nodeName := cache.nodeTree.next() + if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { + nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) + if len(n.PodsWithAffinity()) > 0 { + nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n) + } + } else { + klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) + } + } + } else { + for _, n := range nodeSnapshot.NodeInfoList { if len(n.PodsWithAffinity()) > 0 { nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n) } - } else { - klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) } } - return nil +} + +// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. +func (cache *schedulerCache) removeDeletedNodesFromSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) { + toDelete := len(nodeSnapshot.NodeInfoMap) - len(cache.nodes) + for name := range nodeSnapshot.NodeInfoMap { + if toDelete <= 0 { + break + } + if _, ok := cache.nodes[name]; !ok { + delete(nodeSnapshot.NodeInfoMap, name) + toDelete-- + } + } } func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { @@ -542,6 +604,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { if !ok { n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[newNode.Name] = n + cache.nodeTree.addNode(newNode) } else { cache.removeNodeImageStates(n.info.Node()) } diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 6f9fb23d28c..0e6b83b92fa 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1170,6 +1170,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } pods = append(pods, pod) } + // Create a few pods as updated versions of the above pods. updatedPods := []*v1.Pod{} for _, p := range pods { @@ -1179,38 +1180,76 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { updatedPods = append(updatedPods, updatedPod) } + // Add a couple of pods with affinity, on the first and seconds nodes. + podsWithAffinity := []*v1.Pod{} + for i := 0; i < 2; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod%v", i), + Namespace: "test-ns", + UID: types.UID(fmt.Sprintf("test-puid%v", i)), + }, + Spec: v1.PodSpec{ + NodeName: fmt.Sprintf("test-node%v", i), + Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{}, + }, + }, + } + podsWithAffinity = append(podsWithAffinity, pod) + } + var cache *schedulerCache var snapshot *nodeinfosnapshot.Snapshot type operation = func() addNode := func(i int) operation { return func() { - cache.AddNode(nodes[i]) + if err := cache.AddNode(nodes[i]); err != nil { + t.Error(err) + } } } removeNode := func(i int) operation { return func() { - cache.RemoveNode(nodes[i]) + if err := cache.RemoveNode(nodes[i]); err != nil { + t.Error(err) + } } } updateNode := func(i int) operation { return func() { - cache.UpdateNode(nodes[i], updatedNodes[i]) + if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil { + t.Error(err) + } } } addPod := func(i int) operation { return func() { - cache.AddPod(pods[i]) + if err := cache.AddPod(pods[i]); err != nil { + t.Error(err) + } } } - removePod := func(i int) operation { + addPodWithAffinity := func(i int) operation { return func() { - cache.RemovePod(pods[i]) + if err := cache.AddPod(podsWithAffinity[i]); err != nil { + t.Error(err) + } + } + } + removePodWithAffinity := func(i int) operation { + return func() { + if err := cache.RemovePod(podsWithAffinity[i]); err != nil { + t.Error(err) + } } } updatePod := func(i int) operation { return func() { - cache.UpdatePod(pods[i], updatedPods[i]) + if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { + t.Error(err) + } } } updateSnapshot := func() operation { @@ -1223,9 +1262,10 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } tests := []struct { - name string - operations []operation - expected []*v1.Node + name string + operations []operation + expected []*v1.Node + expectedHavePodsWithAffinity int }{ { name: "Empty cache", @@ -1244,6 +1284,13 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[1]}, }, + { + name: "Add node and remove it in the same cycle, add it again", + operations: []operation{ + addNode(1), updateSnapshot(), addNode(2), removeNode(1), + }, + expected: []*v1.Node{nodes[2]}, + }, { name: "Add a few nodes, and snapshot in the middle", operations: []operation{ @@ -1262,7 +1309,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { { name: "Remove non-existing node", operations: []operation{ - addNode(0), addNode(1), updateSnapshot(), removeNode(8), + addNode(0), addNode(1), updateSnapshot(), }, expected: []*v1.Node{nodes[1], nodes[0]}, }, @@ -1324,10 +1371,34 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { { name: "Remove pod from non-existing node", operations: []operation{ - addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3), + addNode(0), addPod(0), addNode(2), updateSnapshot(), }, expected: []*v1.Node{nodes[2], nodes[0]}, }, + { + name: "Add Pods with affinity", + operations: []operation{ + addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedHavePodsWithAffinity: 1, + }, + { + name: "Add multiple nodes with pods with affinity", + operations: []operation{ + addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), addPodWithAffinity(1), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedHavePodsWithAffinity: 2, + }, + { + name: "Add then Remove pods with affinity", + operations: []operation{ + addNode(0), addNode(1), addPodWithAffinity(0), updateSnapshot(), removePodWithAffinity(0), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0], nodes[1]}, + expectedHavePodsWithAffinity: 0, + }, } for _, test := range tests { @@ -1355,8 +1426,15 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) } + // Check number of nodes with pods with affinity + if len(snapshot.HavePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity { + t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.HavePodsWithAffinityNodeInfoList)) + } + // Always update the snapshot at the end of operations and compare it. - cache.UpdateNodeInfoSnapshot(snapshot) + if err := cache.UpdateNodeInfoSnapshot(snapshot); err != nil { + t.Error(err) + } if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil { t.Error(err) } @@ -1365,14 +1443,49 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *nodeinfosnapshot.Snapshot) error { + // Compare the map. if len(snapshot.NodeInfoMap) != len(cache.nodes) { return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) } for name, ni := range cache.nodes { if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) { - return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name]) + return fmt.Errorf("unexpected node info for node %q. Expected: %v, got: %v", name, ni.info, snapshot.NodeInfoMap[name]) } } + + // Compare the lists. + if len(snapshot.NodeInfoList) != len(cache.nodes) { + return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoList)) + } + + expectedNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) + expectedHavePodsWithAffinityNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) + for i := 0; i < cache.nodeTree.numNodes; i++ { + nodeName := cache.nodeTree.next() + if n := snapshot.NodeInfoMap[nodeName]; n != nil { + expectedNodeInfoList = append(expectedNodeInfoList, n) + if len(n.PodsWithAffinity()) > 0 { + expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n) + } + } else { + return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName) + } + } + + for i, expected := range expectedNodeInfoList { + got := snapshot.NodeInfoList[i] + if expected != got { + return fmt.Errorf("unexpected NodeInfo pointer in NodeInfoList. Expected: %p, got: %p", expected, got) + } + } + + for i, expected := range expectedHavePodsWithAffinityNodeInfoList { + got := snapshot.HavePodsWithAffinityNodeInfoList[i] + if expected != got { + return fmt.Errorf("unexpected NodeInfo pointer in HavePodsWithAffinityNodeInfoList. Expected: %p, got: %p", expected, got) + } + } + return nil } diff --git a/pkg/scheduler/nodeinfo/node_info.go b/pkg/scheduler/nodeinfo/node_info.go index 3d7e4d0dcd8..32b7cce6a19 100644 --- a/pkg/scheduler/nodeinfo/node_info.go +++ b/pkg/scheduler/nodeinfo/node_info.go @@ -548,13 +548,23 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error { n.UpdateUsedPorts(pod, false) n.generation = nextGeneration() - + n.resetSlicesIfEmpty() return nil } } return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) } +// resets the slices to nil so that we can do DeepEqual in unit tests. +func (n *NodeInfo) resetSlicesIfEmpty() { + if len(n.podsWithAffinity) == 0 { + n.podsWithAffinity = nil + } + if len(n.pods) == 0 { + n.pods = nil + } +} + func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) { resPtr := &res for _, c := range pod.Spec.Containers { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index a984b2d5d28..5075710359a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -62,7 +62,6 @@ var ( // emptyFramework is an empty framework used in tests. // Note: If the test runs in goroutine, please don't use this variable to avoid a race condition. emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil) - emptySnapshot = nodeinfosnapshot.NewEmptySnapshot() ) type fakeBinder struct { @@ -652,7 +651,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C predicates.EmptyMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyMetadataProducer, - emptySnapshot, + nodeinfosnapshot.NewEmptySnapshot(), emptyFramework, []algorithm.SchedulerExtender{}, nil, @@ -703,7 +702,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc predicates.EmptyMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyMetadataProducer, - emptySnapshot, + nodeinfosnapshot.NewEmptySnapshot(), emptyFramework, []algorithm.SchedulerExtender{}, nil,