Merge pull request #95130 from alculquicondor/fix-snapshot

Fix UpdateSnapshot when Node is partially removed
This commit is contained in:
Kubernetes Prow Robot 2020-09-28 21:31:38 -07:00 committed by GitHub
commit 3a987d5b85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 23 deletions

View File

@ -196,6 +196,8 @@ func (cache *schedulerCache) Dump() *Dump {
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle. // beginning of every scheduling cycle.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the // This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken. // entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
@ -256,7 +258,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
nodeSnapshot.generation = cache.headNode.info.Generation nodeSnapshot.generation = cache.headNode.info.Generation
} }
if len(nodeSnapshot.nodeInfoMap) > len(cache.nodes) { // Comparing to pods in nodeTree.
// Deleted nodes get removed from the tree, but they might remain in the nodes map
// if they still have non-deleted Pods.
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot) cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true updateAllLists = true
} }
@ -318,12 +323,12 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. // If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
toDelete := len(snapshot.nodeInfoMap) - len(cache.nodes) toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
for name := range snapshot.nodeInfoMap { for name := range snapshot.nodeInfoMap {
if toDelete <= 0 { if toDelete <= 0 {
break break
} }
if _, ok := cache.nodes[name]; !ok { if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil {
delete(snapshot.nodeInfoMap, name) delete(snapshot.nodeInfoMap, name)
toDelete-- toDelete--
} }

View File

@ -1257,66 +1257,66 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
var cache *schedulerCache var cache *schedulerCache
var snapshot *Snapshot var snapshot *Snapshot
type operation = func() type operation = func(t *testing.T)
addNode := func(i int) operation { addNode := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.AddNode(nodes[i]); err != nil { if err := cache.AddNode(nodes[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
removeNode := func(i int) operation { removeNode := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.RemoveNode(nodes[i]); err != nil { if err := cache.RemoveNode(nodes[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
updateNode := func(i int) operation { updateNode := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil { if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
addPod := func(i int) operation { addPod := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.AddPod(pods[i]); err != nil { if err := cache.AddPod(pods[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
addPodWithAffinity := func(i int) operation { addPodWithAffinity := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.AddPod(podsWithAffinity[i]); err != nil { if err := cache.AddPod(podsWithAffinity[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
removePod := func(i int) operation { removePod := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.RemovePod(pods[i]); err != nil { if err := cache.RemovePod(pods[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
removePodWithAffinity := func(i int) operation { removePodWithAffinity := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.RemovePod(podsWithAffinity[i]); err != nil { if err := cache.RemovePod(podsWithAffinity[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
updatePod := func(i int) operation { updatePod := func(i int) operation {
return func() { return func(t *testing.T) {
if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
t.Error(err) t.Error(err)
} }
} }
} }
updateSnapshot := func() operation { updateSnapshot := func() operation {
return func() { return func(t *testing.T) {
cache.UpdateSnapshot(snapshot) cache.UpdateSnapshot(snapshot)
if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
t.Error(err) t.Error(err)
@ -1434,8 +1434,9 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
{ {
name: "Remove node before its pods", name: "Remove node before its pods",
operations: []operation{ operations: []operation{
addNode(0), addNode(1), addPod(1), addPod(11), addNode(0), addNode(1), addPod(1), addPod(11), updateSnapshot(),
removeNode(1), updatePod(1), updatePod(11), removePod(1), removePod(11), removeNode(1), updateSnapshot(),
updatePod(1), updatePod(11), removePod(1), removePod(11),
}, },
expected: []*v1.Node{nodes[0]}, expected: []*v1.Node{nodes[0]},
}, },
@ -1471,7 +1472,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
snapshot = NewEmptySnapshot() snapshot = NewEmptySnapshot()
for _, op := range test.operations { for _, op := range test.operations {
op() op(t)
} }
if len(test.expected) != len(cache.nodes) { if len(test.expected) != len(cache.nodes) {
@ -1508,18 +1509,22 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error { func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error {
// Compare the map. // Compare the map.
if len(snapshot.nodeInfoMap) != len(cache.nodes) { if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoMap)) return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap))
} }
for name, ni := range cache.nodes { for name, ni := range cache.nodes {
if !reflect.DeepEqual(snapshot.nodeInfoMap[name], ni.info) { want := ni.info
return fmt.Errorf("unexpected node info for node %q. Expected: %v, got: %v", name, ni.info, snapshot.nodeInfoMap[name]) if want.Node() == nil {
want = nil
}
if !reflect.DeepEqual(snapshot.nodeInfoMap[name], want) {
return fmt.Errorf("unexpected node info for node %q.Expected:\n%v, got:\n%v", name, ni.info, snapshot.nodeInfoMap[name])
} }
} }
// Compare the lists. // Compare the lists.
if len(snapshot.nodeInfoList) != len(cache.nodes) { if len(snapshot.nodeInfoList) != cache.nodeTree.numNodes {
return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoList)) return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoList))
} }
expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)

View File

@ -17,7 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
) )
@ -99,6 +99,8 @@ type Cache interface {
// UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache. // UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be) // The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node. // on this node.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
UpdateSnapshot(nodeSnapshot *Snapshot) error UpdateSnapshot(nodeSnapshot *Snapshot) error
// Dump produces a dump of the current cache. // Dump produces a dump of the current cache.