From d6f09f7dfb51cbed389e5bacbe4c83dfad3e9b97 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 28 Sep 2020 17:01:12 -0400 Subject: [PATCH] Fix UpdateSnapshot when Node is partially removed Change-Id: I5b459e9ea67020183c87d1ce0a2380efb8cc3e05 --- pkg/scheduler/internal/cache/cache.go | 11 ++++-- pkg/scheduler/internal/cache/cache_test.go | 43 ++++++++++++---------- pkg/scheduler/internal/cache/interface.go | 4 +- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 35c89985054..fac755c8a70 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -196,6 +196,8 @@ func (cache *schedulerCache) Dump() *Dump { // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at // beginning of every scheduling cycle. +// The snapshot only includes Nodes that are not deleted at the time this function is called. +// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. // This function tracks generation number of NodeInfo and updates only the // entries of an existing snapshot that have changed after the snapshot was taken. func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { @@ -256,7 +258,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { nodeSnapshot.generation = cache.headNode.info.Generation } - if len(nodeSnapshot.nodeInfoMap) > len(cache.nodes) { + // Comparing to pods in nodeTree. + // Deleted nodes get removed from the tree, but they might remain in the nodes map + // if they still have non-deleted Pods. + if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes { cache.removeDeletedNodesFromSnapshot(nodeSnapshot) updateAllLists = true } @@ -318,12 +323,12 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { - toDelete := len(snapshot.nodeInfoMap) - len(cache.nodes) + toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes for name := range snapshot.nodeInfoMap { if toDelete <= 0 { break } - if _, ok := cache.nodes[name]; !ok { + if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil { delete(snapshot.nodeInfoMap, name) toDelete-- } diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 853eebe89bf..9801c2497ab 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1257,66 +1257,66 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { var cache *schedulerCache var snapshot *Snapshot - type operation = func() + type operation = func(t *testing.T) addNode := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.AddNode(nodes[i]); err != nil { t.Error(err) } } } removeNode := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.RemoveNode(nodes[i]); err != nil { t.Error(err) } } } updateNode := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil { t.Error(err) } } } addPod := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.AddPod(pods[i]); err != nil { t.Error(err) } } } addPodWithAffinity := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.AddPod(podsWithAffinity[i]); err != nil { t.Error(err) } } } removePod := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.RemovePod(pods[i]); err != nil { t.Error(err) } } } removePodWithAffinity := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.RemovePod(podsWithAffinity[i]); err != nil { t.Error(err) } } } updatePod := func(i int) operation { - return func() { + return func(t *testing.T) { if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { t.Error(err) } } } updateSnapshot := func() operation { - return func() { + return func(t *testing.T) { cache.UpdateSnapshot(snapshot) if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { t.Error(err) @@ -1434,8 +1434,9 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { { name: "Remove node before its pods", operations: []operation{ - addNode(0), addNode(1), addPod(1), addPod(11), - removeNode(1), updatePod(1), updatePod(11), removePod(1), removePod(11), + addNode(0), addNode(1), addPod(1), addPod(11), updateSnapshot(), + removeNode(1), updateSnapshot(), + updatePod(1), updatePod(11), removePod(1), removePod(11), }, expected: []*v1.Node{nodes[0]}, }, @@ -1471,7 +1472,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { snapshot = NewEmptySnapshot() for _, op := range test.operations { - op() + op(t) } if len(test.expected) != len(cache.nodes) { @@ -1508,18 +1509,22 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error { // Compare the map. - if len(snapshot.nodeInfoMap) != len(cache.nodes) { - return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoMap)) + if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes { + return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap)) } for name, ni := range cache.nodes { - if !reflect.DeepEqual(snapshot.nodeInfoMap[name], ni.info) { - return fmt.Errorf("unexpected node info for node %q. Expected: %v, got: %v", name, ni.info, snapshot.nodeInfoMap[name]) + want := ni.info + if want.Node() == nil { + want = nil + } + if !reflect.DeepEqual(snapshot.nodeInfoMap[name], want) { + return fmt.Errorf("unexpected node info for node %q.Expected:\n%v, got:\n%v", name, ni.info, snapshot.nodeInfoMap[name]) } } // Compare the lists. - if len(snapshot.nodeInfoList) != len(cache.nodes) { - return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoList)) + if len(snapshot.nodeInfoList) != cache.nodeTree.numNodes { + return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoList)) } expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index e9ab9db8fac..fc60f429355 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -17,7 +17,7 @@ limitations under the License. package cache import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -99,6 +99,8 @@ type Cache interface { // UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. + // The snapshot only includes Nodes that are not deleted at the time this function is called. + // nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. UpdateSnapshot(nodeSnapshot *Snapshot) error // Dump produces a dump of the current cache.