diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index bc17d178bf9..1d3fbb420ad 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -437,19 +437,18 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { } // Assumes that lock is already acquired. +// Removes a pod from the cached node info. When a node is removed, some pod +// deletion events might arrive later. This is not a problem, as the pods in +// the node are assumed to be removed already. func (cache *schedulerCache) removePod(pod *v1.Pod) error { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { - return fmt.Errorf("node %v is not found", pod.Spec.NodeName) + return nil } if err := n.info.RemovePod(pod); err != nil { return err } - if len(n.info.Pods()) == 0 && n.info.Node() == nil { - cache.removeNodeInfoFromList(pod.Spec.NodeName) - } else { - cache.moveNodeInfoToHead(pod.Spec.NodeName) - } + cache.moveNodeInfoToHead(pod.Spec.NodeName) return nil } @@ -563,6 +562,8 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { return b, nil } +// GetPod might return a pod for which its node has already been deleted from +// the main cache. This is useful to properly process pod update events. func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { key, err := schedulernodeinfo.GetPodKey(pod) if err != nil { @@ -617,27 +618,21 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { return n.info.SetNode(newNode) } +// RemoveNode removes a node from the cache. +// Some nodes might still have pods because their deletion events didn't arrive +// yet. For most intents and purposes, those pods are removed from the cache, +// having it's source of truth in the cached nodes. +// However, some information on pods (assumedPods, podStates) persist. These +// caches will be eventually consistent as pod deletion events arrive. func (cache *schedulerCache) RemoveNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() - n, ok := cache.nodes[node.Name] + _, ok := cache.nodes[node.Name] if !ok { return fmt.Errorf("node %v is not found", node.Name) } - if err := n.info.RemoveNode(node); err != nil { - return err - } - // We remove NodeInfo for this node only if there aren't any pods on this node. - // We can't do it unconditionally, because notifications about pods are delivered - // in a different watch, and thus can potentially be observed later, even though - // they happened before node removal. - if len(n.info.Pods()) == 0 && n.info.Node() == nil { - cache.removeNodeInfoFromList(node.Name) - } else { - cache.moveNodeInfoToHead(node.Name) - } - + cache.removeNodeInfoFromList(node.Name) if err := cache.nodeTree.removeNode(node); err != nil { return err } @@ -715,7 +710,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { for key := range cache.assumedPods { ps, ok := cache.podStates[key] if !ok { - panic("Key found in assumed set but not in podStates. Potentially a logical error.") + klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.") } if !ps.bindingFinished { klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 0e6b83b92fa..25e922f68de 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "errors" "fmt" "reflect" "strings" @@ -35,9 +36,9 @@ import ( nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) -func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) { +func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) error { if (actual == nil) != (expected == nil) { - t.Error("One of the actual or expected is nil and the other is not!") + return errors.New("one of the actual or expected is nil and the other is not") } // Ignore generation field. if actual != nil { @@ -47,8 +48,9 @@ func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoList expected.SetGeneration(0) } if actual != nil && !reflect.DeepEqual(actual.info, expected) { - t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected) + return fmt.Errorf("got node info %s, want %s", actual.info, expected) } + return nil } type hostPortInfoParam struct { @@ -208,23 +210,27 @@ func TestAssumePodScheduled(t *testing.T) { } for i, tt := range tests { - cache := newSchedulerCache(time.Second, time.Second, nil) - for _, pod := range tt.pods { - if err := cache.AssumePod(pod); err != nil { - t.Fatalf("AssumePod failed: %v", err) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(time.Second, time.Second, nil) + for _, pod := range tt.pods { + if err := cache.AssumePod(pod); err != nil { + t.Fatalf("AssumePod failed: %v", err) + } + } + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) } - } - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) - for _, pod := range tt.pods { - if err := cache.ForgetPod(pod); err != nil { - t.Fatalf("ForgetPod failed: %v", err) + for _, pod := range tt.pods { + if err := cache.ForgetPod(pod); err != nil { + t.Fatalf("ForgetPod failed: %v", err) + } + if err := isForgottenFromCache(pod, cache); err != nil { + t.Errorf("pod %s: %v", pod.Name, err) + } } - } - if cache.nodes[nodeName] != nil { - t.Errorf("NodeInfo should be cleaned for %s", nodeName) - } + }) } } @@ -262,7 +268,7 @@ func TestExpirePod(t *testing.T) { {pod: testPods[0], assumedTime: now}, }, cleanupTime: now.Add(2 * ttl), - wNodeInfo: nil, + wNodeInfo: schedulernodeinfo.NewNodeInfo(), }, { // first one would expire, second one would not. pods: []*testExpirePodStruct{ {pod: testPods[0], assumedTime: now}, @@ -285,17 +291,21 @@ func TestExpirePod(t *testing.T) { }} for i, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(ttl, time.Second, nil) - for _, pod := range tt.pods { - if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { - t.Fatalf("assumePod failed: %v", err) + for _, pod := range tt.pods { + if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { + t.Fatalf("assumePod failed: %v", err) + } } - } - // pods that have assumedTime + ttl < cleanupTime will get expired and removed - cache.cleanupAssumedPods(tt.cleanupTime) - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) + // pods that have assumedTime + ttl < cleanupTime will get expired and removed + cache.cleanupAssumedPods(tt.cleanupTime) + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) + } + }) } } @@ -336,21 +346,25 @@ func TestAddPodWillConfirm(t *testing.T) { }} for i, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, podToAssume := range tt.podsToAssume { - if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { - t.Fatalf("assumePod failed: %v", err) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } } - } - for _, podToAdd := range tt.podsToAdd { - if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } } - } - cache.cleanupAssumedPods(now.Add(2 * ttl)) - // check after expiration. confirmed pods shouldn't be expired. - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) + cache.cleanupAssumedPods(now.Add(2 * ttl)) + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) + } + }) } } @@ -438,27 +452,30 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { }} for i, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, podToAssume := range tt.podsToAssume { - if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { - t.Fatalf("assumePod failed: %v", err) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } } - } - for _, podToAdd := range tt.podsToAdd { - if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } } - } - for _, podToUpdate := range tt.podsToUpdate { - if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil { - t.Fatalf("UpdatePod failed: %v", err) + for _, podToUpdate := range tt.podsToUpdate { + if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil { + t.Fatalf("UpdatePod failed: %v", err) + } } - } - for nodeName, expected := range tt.wNodeInfo { - t.Log(nodeName) - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, expected) - } + for nodeName, expected := range tt.wNodeInfo { + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, expected); err != nil { + t.Errorf("node %q: %v", nodeName, err) + } + } + }) } } @@ -490,24 +507,27 @@ func TestAddPodAfterExpiration(t *testing.T) { ), }} - now := time.Now() for i, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { - t.Fatalf("assumePod failed: %v", err) - } - cache.cleanupAssumedPods(now.Add(2 * ttl)) - // It should be expired and removed. - n := cache.nodes[nodeName] - if n != nil { - t.Errorf("#%d: expecting nil node info, but get=%v", i, n) - } - if err := cache.AddPod(tt.pod); err != nil { - t.Fatalf("AddPod failed: %v", err) - } - // check after expiration. confirmed pods shouldn't be expired. - n = cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + now := time.Now() + cache := newSchedulerCache(ttl, time.Second, nil) + if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + cache.cleanupAssumedPods(now.Add(2 * ttl)) + // It should be expired and removed. + if err := isForgottenFromCache(tt.pod, cache); err != nil { + t.Error(err) + } + if err := cache.AddPod(tt.pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) + } + }) } } @@ -556,25 +576,29 @@ func TestUpdatePod(t *testing.T) { )}, }} - for _, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, podToAdd := range tt.podsToAdd { - if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + for i, tt := range tests { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } } - } - for i := range tt.podsToUpdate { - if i == 0 { - continue + for j := range tt.podsToUpdate { + if j == 0 { + continue + } + if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil { + t.Fatalf("UpdatePod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil { + t.Errorf("update %d: %v", j, err) + } } - if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { - t.Fatalf("UpdatePod failed: %v", err) - } - // check after expiration. confirmed pods shouldn't be expired. - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1]) - } + }) } } @@ -684,33 +708,37 @@ func TestExpireAddUpdatePod(t *testing.T) { )}, }} - now := time.Now() - for _, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, podToAssume := range tt.podsToAssume { - if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { - t.Fatalf("assumePod failed: %v", err) + for i, tt := range tests { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + now := time.Now() + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } } - } - cache.cleanupAssumedPods(now.Add(2 * ttl)) + cache.cleanupAssumedPods(now.Add(2 * ttl)) - for _, podToAdd := range tt.podsToAdd { - if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } } - } - for i := range tt.podsToUpdate { - if i == 0 { - continue + for j := range tt.podsToUpdate { + if j == 0 { + continue + } + if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil { + t.Fatalf("UpdatePod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil { + t.Errorf("update %d: %v", j, err) + } } - if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { - t.Fatalf("UpdatePod failed: %v", err) - } - // check after expiration. confirmed pods shouldn't be expired. - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1]) - } + }) } } @@ -761,21 +789,23 @@ func TestEphemeralStorageResource(t *testing.T) { }, } for i, tt := range tests { - cache := newSchedulerCache(time.Second, time.Second, nil) - if err := cache.AddPod(tt.pod); err != nil { - t.Fatalf("AddPod failed: %v", err) - } - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newSchedulerCache(time.Second, time.Second, nil) + if err := cache.AddPod(tt.pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) + } - if err := cache.RemovePod(tt.pod); err != nil { - t.Fatalf("RemovePod failed: %v", err) - } - - n = cache.nodes[nodeName] - if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) - } + if err := cache.RemovePod(tt.pod); err != nil { + t.Fatalf("RemovePod failed: %v", err) + } + if _, err := cache.GetPod(tt.pod); err == nil { + t.Errorf("pod was not deleted") + } + }) } } @@ -783,12 +813,20 @@ func TestEphemeralStorageResource(t *testing.T) { func TestRemovePod(t *testing.T) { // Enable volumesOnNodeForBalancing to do balanced resource allocation defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)() - nodeName := "node" - basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) + basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) tests := []struct { + nodes []*v1.Node pod *v1.Pod wNodeInfo *schedulernodeinfo.NodeInfo }{{ + nodes: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + }, + }, pod: basePod, wNodeInfo: newNodeInfo( &schedulernodeinfo.Resource{ @@ -806,74 +844,75 @@ func TestRemovePod(t *testing.T) { }} for i, tt := range tests { - cache := newSchedulerCache(time.Second, time.Second, nil) - if err := cache.AddPod(tt.pod); err != nil { - t.Fatalf("AddPod failed: %v", err) - } - n := cache.nodes[nodeName] - deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + nodeName := tt.pod.Spec.NodeName + cache := newSchedulerCache(time.Second, time.Second, nil) + // Add pod succeeds even before adding the nodes. + if err := cache.AddPod(tt.pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { + t.Error(err) + } + for _, n := range tt.nodes { + if err := cache.AddNode(n); err != nil { + t.Error(err) + } + } - if err := cache.RemovePod(tt.pod); err != nil { - t.Fatalf("RemovePod failed: %v", err) - } + if err := cache.RemovePod(tt.pod); err != nil { + t.Fatalf("RemovePod failed: %v", err) + } - n = cache.nodes[nodeName] - if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) - } + if _, err := cache.GetPod(tt.pod); err == nil { + t.Errorf("pod was not deleted") + } + + // Node that owned the Pod should be at the head of the list. + if cache.headNode.info.Node().Name != nodeName { + t.Errorf("node %q is not at the head of the list", nodeName) + } + }) } } func TestForgetPod(t *testing.T) { nodeName := "node" basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) - tests := []struct { - pods []*v1.Pod - }{{ - pods: []*v1.Pod{basePod}, - }} + pods := []*v1.Pod{basePod} now := time.Now() ttl := 10 * time.Second - for i, tt := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, pod := range tt.pods { - if err := assumeAndFinishBinding(cache, pod, now); err != nil { - t.Fatalf("assumePod failed: %v", err) - } - isAssumed, err := cache.IsAssumedPod(pod) - if err != nil { - t.Fatalf("IsAssumedPod failed: %v.", err) - } - if !isAssumed { - t.Fatalf("Pod is expected to be assumed.") - } - assumedPod, err := cache.GetPod(pod) - if err != nil { - t.Fatalf("GetPod failed: %v.", err) - } - if assumedPod.Namespace != pod.Namespace { - t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace) - } - if assumedPod.Name != pod.Name { - t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name) - } + cache := newSchedulerCache(ttl, time.Second, nil) + for _, pod := range pods { + if err := assumeAndFinishBinding(cache, pod, now); err != nil { + t.Fatalf("assumePod failed: %v", err) } - for _, pod := range tt.pods { - if err := cache.ForgetPod(pod); err != nil { - t.Fatalf("ForgetPod failed: %v", err) - } - isAssumed, err := cache.IsAssumedPod(pod) - if err != nil { - t.Fatalf("IsAssumedPod failed: %v.", err) - } - if isAssumed { - t.Fatalf("Pod is expected to be unassumed.") - } + isAssumed, err := cache.IsAssumedPod(pod) + if err != nil { + t.Fatalf("IsAssumedPod failed: %v.", err) } - cache.cleanupAssumedPods(now.Add(2 * ttl)) - if n := cache.nodes[nodeName]; n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) + if !isAssumed { + t.Fatalf("Pod is expected to be assumed.") + } + assumedPod, err := cache.GetPod(pod) + if err != nil { + t.Fatalf("GetPod failed: %v.", err) + } + if assumedPod.Namespace != pod.Namespace { + t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace) + } + if assumedPod.Name != pod.Name { + t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name) + } + } + for _, pod := range pods { + if err := cache.ForgetPod(pod); err != nil { + t.Fatalf("ForgetPod failed: %v", err) + } + if err := isForgottenFromCache(pod, cache); err != nil { + t.Errorf("pod %q: %v", pod.Name, err) } } } @@ -1051,78 +1090,105 @@ func TestNodeOperators(t *testing.T) { }, } - for _, test := range tests { - expected := buildNodeInfo(test.node, test.pods) - node := test.node + for i, test := range tests { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + expected := buildNodeInfo(test.node, test.pods) + node := test.node - cache := newSchedulerCache(time.Second, time.Second, nil) - cache.AddNode(node) - for _, pod := range test.pods { - cache.AddPod(pod) - } + cache := newSchedulerCache(time.Second, time.Second, nil) + if err := cache.AddNode(node); err != nil { + t.Fatal(err) + } + for _, pod := range test.pods { + if err := cache.AddPod(pod); err != nil { + t.Fatal(err) + } + } - // Case 1: the node was added into cache successfully. - got, found := cache.nodes[node.Name] - if !found { - t.Errorf("Failed to find node %v in internalcache.", node.Name) - } - if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { - t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) - } + // Step 1: the node was added into cache successfully. + got, found := cache.nodes[node.Name] + if !found { + t.Errorf("Failed to find node %v in internalcache.", node.Name) + } + if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { + t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) + } - // Generations are globally unique. We check in our unit tests that they are incremented correctly. - expected.SetGeneration(got.info.GetGeneration()) - if !reflect.DeepEqual(got.info, expected) { - t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) - } + // Generations are globally unique. We check in our unit tests that they are incremented correctly. + expected.SetGeneration(got.info.GetGeneration()) + if !reflect.DeepEqual(got.info, expected) { + t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) + } - // Case 2: dump cached nodes successfully. - cachedNodes := nodeinfosnapshot.NewEmptySnapshot() - cache.UpdateNodeInfoSnapshot(cachedNodes) - newNode, found := cachedNodes.NodeInfoMap[node.Name] - if !found || len(cachedNodes.NodeInfoMap) != 1 { - t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) - } - expected.SetGeneration(newNode.GetGeneration()) - if !reflect.DeepEqual(newNode, expected) { - t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) - } + // Step 2: dump cached nodes successfully. + cachedNodes := nodeinfosnapshot.NewEmptySnapshot() + if err := cache.UpdateNodeInfoSnapshot(cachedNodes); err != nil { + t.Error(err) + } + newNode, found := cachedNodes.NodeInfoMap[node.Name] + if !found || len(cachedNodes.NodeInfoMap) != 1 { + t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) + } + expected.SetGeneration(newNode.GetGeneration()) + if !reflect.DeepEqual(newNode, expected) { + t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) + } - // Case 3: update node attribute successfully. - node.Status.Allocatable[v1.ResourceMemory] = mem50m - allocatableResource := expected.AllocatableResource() - newAllocatableResource := &allocatableResource - newAllocatableResource.Memory = mem50m.Value() - expected.SetAllocatableResource(newAllocatableResource) + // Step 3: update node attribute successfully. + node.Status.Allocatable[v1.ResourceMemory] = mem50m + allocatableResource := expected.AllocatableResource() + newAllocatableResource := &allocatableResource + newAllocatableResource.Memory = mem50m.Value() + expected.SetAllocatableResource(newAllocatableResource) - cache.UpdateNode(nil, node) - got, found = cache.nodes[node.Name] - if !found { - t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name) - } - if got.info.GetGeneration() <= expected.GetGeneration() { - t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration()) - } - expected.SetGeneration(got.info.GetGeneration()) + if err := cache.UpdateNode(nil, node); err != nil { + t.Error(err) + } + got, found = cache.nodes[node.Name] + if !found { + t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name) + } + if got.info.GetGeneration() <= expected.GetGeneration() { + t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration()) + } + expected.SetGeneration(got.info.GetGeneration()) - if !reflect.DeepEqual(got.info, expected) { - t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) - } - // Check nodeTree after update - if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { - t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name) - } + if !reflect.DeepEqual(got.info, expected) { + t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) + } + // Check nodeTree after update + if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { + t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name) + } - // Case 4: the node can not be removed if pods is not empty. - cache.RemoveNode(node) - if _, found := cache.nodes[node.Name]; !found { - t.Errorf("The node %v should not be removed if pods is not empty.", node.Name) - } - // Check nodeTree after remove. The node should be removed from the nodeTree even if there are - // still pods on it. - if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { - t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) - } + // Step 4: the node can be removed even if it still has pods. + if err := cache.RemoveNode(node); err != nil { + t.Error(err) + } + if _, err := cache.GetNodeInfo(node.Name); err == nil { + t.Errorf("The node %v should be removed.", node.Name) + } + // Check node is removed from nodeTree as well. + if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { + t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) + } + // Pods are still in the pods cache. + for _, p := range test.pods { + if _, err := cache.GetPod(p); err != nil { + t.Error(err) + } + } + + // Step 5: removing pods for the removed node still succeeds. + for _, p := range test.pods { + if err := cache.RemovePod(p); err != nil { + t.Error(err) + } + if _, err := cache.GetPod(p); err == nil { + t.Errorf("pod %q still in cache", p.Name) + } + } + }) } } @@ -1591,3 +1657,15 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) } return cache } + +func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { + if assumed, err := c.IsAssumedPod(p); err != nil { + return err + } else if assumed { + return errors.New("still assumed") + } + if _, err := c.GetPod(p); err == nil { + return errors.New("still in cache") + } + return nil +} diff --git a/pkg/scheduler/nodeinfo/node_info.go b/pkg/scheduler/nodeinfo/node_info.go index 32b7cce6a19..c2291ccb78f 100644 --- a/pkg/scheduler/nodeinfo/node_info.go +++ b/pkg/scheduler/nodeinfo/node_info.go @@ -632,23 +632,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { return nil } -// RemoveNode removes the overall information about the node. -func (n *NodeInfo) RemoveNode(node *v1.Node) error { - // We don't remove NodeInfo for because there can still be some pods on this node - - // this is because notifications about pods are delivered in a different watch, - // and thus can potentially be observed later, even though they happened before - // node removal. This is handled correctly in cache.go file. - n.node = nil - n.allocatableResource = &Resource{} - n.taints, n.taintsErr = nil, nil - n.memoryPressureCondition = v1.ConditionUnknown - n.diskPressureCondition = v1.ConditionUnknown - n.pidPressureCondition = v1.ConditionUnknown - n.imageStates = make(map[string]*ImageStateSummary) - n.generation = nextGeneration() - return nil -} - // FilterOutPods receives a list of pods and filters out those whose node names // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. //