Remove nodes from cache immediately on delete events

Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2020-01-08 09:43:38 -05:00
parent b9c0aa0b2a
commit afe3b907a1
3 changed files with 342 additions and 286 deletions

View File

@ -437,19 +437,18 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
} }
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
// Removes a pod from the cached node info. When a node is removed, some pod
// deletion events might arrive later. This is not a problem, as the pods in
// the node are assumed to be removed already.
func (cache *schedulerCache) removePod(pod *v1.Pod) error { func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName] n, ok := cache.nodes[pod.Spec.NodeName]
if !ok { if !ok {
return fmt.Errorf("node %v is not found", pod.Spec.NodeName) return nil
} }
if err := n.info.RemovePod(pod); err != nil { if err := n.info.RemovePod(pod); err != nil {
return err return err
} }
if len(n.info.Pods()) == 0 && n.info.Node() == nil { cache.moveNodeInfoToHead(pod.Spec.NodeName)
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil return nil
} }
@ -563,6 +562,8 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
return b, nil return b, nil
} }
// GetPod might return a pod for which its node has already been deleted from
// the main cache. This is useful to properly process pod update events.
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := schedulernodeinfo.GetPodKey(pod) key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil { if err != nil {
@ -617,27 +618,21 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
return n.info.SetNode(newNode) return n.info.SetNode(newNode)
} }
// RemoveNode removes a node from the cache.
// Some nodes might still have pods because their deletion events didn't arrive
// yet. For most intents and purposes, those pods are removed from the cache,
// having it's source of truth in the cached nodes.
// However, some information on pods (assumedPods, podStates) persist. These
// caches will be eventually consistent as pod deletion events arrive.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error { func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name] _, ok := cache.nodes[node.Name]
if !ok { if !ok {
return fmt.Errorf("node %v is not found", node.Name) return fmt.Errorf("node %v is not found", node.Name)
} }
if err := n.info.RemoveNode(node); err != nil { cache.removeNodeInfoFromList(node.Name)
return err
}
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods()) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil { if err := cache.nodeTree.removeNode(node); err != nil {
return err return err
} }
@ -715,7 +710,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
for key := range cache.assumedPods { for key := range cache.assumedPods {
ps, ok := cache.podStates[key] ps, ok := cache.podStates[key]
if !ok { if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.") klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
} }
if !ps.bindingFinished { if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
@ -35,9 +36,9 @@ import (
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
) )
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) { func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) error {
if (actual == nil) != (expected == nil) { if (actual == nil) != (expected == nil) {
t.Error("One of the actual or expected is nil and the other is not!") return errors.New("one of the actual or expected is nil and the other is not")
} }
// Ignore generation field. // Ignore generation field.
if actual != nil { if actual != nil {
@ -47,8 +48,9 @@ func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoList
expected.SetGeneration(0) expected.SetGeneration(0)
} }
if actual != nil && !reflect.DeepEqual(actual.info, expected) { if actual != nil && !reflect.DeepEqual(actual.info, expected) {
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected) return fmt.Errorf("got node info %s, want %s", actual.info, expected)
} }
return nil
} }
type hostPortInfoParam struct { type hostPortInfoParam struct {
@ -208,23 +210,27 @@ func TestAssumePodScheduled(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
for _, pod := range tt.pods { cache := newSchedulerCache(time.Second, time.Second, nil)
if err := cache.AssumePod(pod); err != nil { for _, pod := range tt.pods {
t.Fatalf("AssumePod failed: %v", err) if err := cache.AssumePod(pod); err != nil {
t.Fatalf("AssumePod failed: %v", err)
}
}
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
t.Error(err)
} }
}
n := cache.nodes[nodeName]
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := cache.ForgetPod(pod); err != nil { if err := cache.ForgetPod(pod); err != nil {
t.Fatalf("ForgetPod failed: %v", err) t.Fatalf("ForgetPod failed: %v", err)
}
if err := isForgottenFromCache(pod, cache); err != nil {
t.Errorf("pod %s: %v", pod.Name, err)
}
} }
} })
if cache.nodes[nodeName] != nil {
t.Errorf("NodeInfo should be cleaned for %s", nodeName)
}
} }
} }
@ -262,7 +268,7 @@ func TestExpirePod(t *testing.T) {
{pod: testPods[0], assumedTime: now}, {pod: testPods[0], assumedTime: now},
}, },
cleanupTime: now.Add(2 * ttl), cleanupTime: now.Add(2 * ttl),
wNodeInfo: nil, wNodeInfo: schedulernodeinfo.NewNodeInfo(),
}, { // first one would expire, second one would not. }, { // first one would expire, second one would not.
pods: []*testExpirePodStruct{ pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now}, {pod: testPods[0], assumedTime: now},
@ -285,17 +291,21 @@ func TestExpirePod(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods { for _, pod := range tt.pods {
if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err) t.Fatalf("assumePod failed: %v", err)
}
} }
} // pods that have assumedTime + ttl < cleanupTime will get expired and removed
// pods that have assumedTime + ttl < cleanupTime will get expired and removed cache.cleanupAssumedPods(tt.cleanupTime)
cache.cleanupAssumedPods(tt.cleanupTime) n := cache.nodes[nodeName]
n := cache.nodes[nodeName] if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) t.Error(err)
}
})
} }
} }
@ -336,21 +346,25 @@ func TestAddPodWillConfirm(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
for _, podToAssume := range tt.podsToAssume { cache := newSchedulerCache(ttl, time.Second, nil)
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { for _, podToAssume := range tt.podsToAssume {
t.Fatalf("assumePod failed: %v", err) if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
} }
} for _, podToAdd := range tt.podsToAdd {
for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil {
if err := cache.AddPod(podToAdd); err != nil { t.Fatalf("AddPod failed: %v", err)
t.Fatalf("AddPod failed: %v", err) }
} }
} cache.cleanupAssumedPods(now.Add(2 * ttl))
cache.cleanupAssumedPods(now.Add(2 * ttl)) // check after expiration. confirmed pods shouldn't be expired.
// check after expiration. confirmed pods shouldn't be expired. n := cache.nodes[nodeName]
n := cache.nodes[nodeName] if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) t.Error(err)
}
})
} }
} }
@ -438,27 +452,30 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
for _, podToAssume := range tt.podsToAssume { cache := newSchedulerCache(ttl, time.Second, nil)
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { for _, podToAssume := range tt.podsToAssume {
t.Fatalf("assumePod failed: %v", err) if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
} }
} for _, podToAdd := range tt.podsToAdd {
for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil {
if err := cache.AddPod(podToAdd); err != nil { t.Fatalf("AddPod failed: %v", err)
t.Fatalf("AddPod failed: %v", err) }
} }
} for _, podToUpdate := range tt.podsToUpdate {
for _, podToUpdate := range tt.podsToUpdate { if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil {
if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil { t.Fatalf("UpdatePod failed: %v", err)
t.Fatalf("UpdatePod failed: %v", err) }
} }
} for nodeName, expected := range tt.wNodeInfo {
for nodeName, expected := range tt.wNodeInfo { n := cache.nodes[nodeName]
t.Log(nodeName) if err := deepEqualWithoutGeneration(n, expected); err != nil {
n := cache.nodes[nodeName] t.Errorf("node %q: %v", nodeName, err)
deepEqualWithoutGeneration(t, i, n, expected) }
} }
})
} }
} }
@ -490,24 +507,27 @@ func TestAddPodAfterExpiration(t *testing.T) {
), ),
}} }}
now := time.Now()
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { now := time.Now()
t.Fatalf("assumePod failed: %v", err) cache := newSchedulerCache(ttl, time.Second, nil)
} if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
cache.cleanupAssumedPods(now.Add(2 * ttl)) t.Fatalf("assumePod failed: %v", err)
// It should be expired and removed. }
n := cache.nodes[nodeName] cache.cleanupAssumedPods(now.Add(2 * ttl))
if n != nil { // It should be expired and removed.
t.Errorf("#%d: expecting nil node info, but get=%v", i, n) if err := isForgottenFromCache(tt.pod, cache); err != nil {
} t.Error(err)
if err := cache.AddPod(tt.pod); err != nil { }
t.Fatalf("AddPod failed: %v", err) if err := cache.AddPod(tt.pod); err != nil {
} t.Fatalf("AddPod failed: %v", err)
// check after expiration. confirmed pods shouldn't be expired. }
n = cache.nodes[nodeName] // check after expiration. confirmed pods shouldn't be expired.
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
t.Error(err)
}
})
} }
} }
@ -556,25 +576,29 @@ func TestUpdatePod(t *testing.T) {
)}, )},
}} }}
for _, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
for _, podToAdd := range tt.podsToAdd { cache := newSchedulerCache(ttl, time.Second, nil)
if err := cache.AddPod(podToAdd); err != nil { for _, podToAdd := range tt.podsToAdd {
t.Fatalf("AddPod failed: %v", err) if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
} }
}
for i := range tt.podsToUpdate { for j := range tt.podsToUpdate {
if i == 0 { if j == 0 {
continue continue
}
if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
t.Errorf("update %d: %v", j, err)
}
} }
if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { })
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
}
} }
} }
@ -684,33 +708,37 @@ func TestExpireAddUpdatePod(t *testing.T) {
)}, )},
}} }}
now := time.Now() for i, tt := range tests {
for _, tt := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil) now := time.Now()
for _, podToAssume := range tt.podsToAssume { cache := newSchedulerCache(ttl, time.Second, nil)
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { for _, podToAssume := range tt.podsToAssume {
t.Fatalf("assumePod failed: %v", err) if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
} }
} cache.cleanupAssumedPods(now.Add(2 * ttl))
cache.cleanupAssumedPods(now.Add(2 * ttl))
for _, podToAdd := range tt.podsToAdd { for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil { if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err) t.Fatalf("AddPod failed: %v", err)
}
} }
}
for i := range tt.podsToUpdate { for j := range tt.podsToUpdate {
if i == 0 { if j == 0 {
continue continue
}
if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
t.Errorf("update %d: %v", j, err)
}
} }
if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { })
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
}
} }
} }
@ -761,21 +789,23 @@ func TestEphemeralStorageResource(t *testing.T) {
}, },
} }
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
if err := cache.AddPod(tt.pod); err != nil { cache := newSchedulerCache(time.Second, time.Second, nil)
t.Fatalf("AddPod failed: %v", err) if err := cache.AddPod(tt.pod); err != nil {
} t.Fatalf("AddPod failed: %v", err)
n := cache.nodes[nodeName] }
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
t.Error(err)
}
if err := cache.RemovePod(tt.pod); err != nil { if err := cache.RemovePod(tt.pod); err != nil {
t.Fatalf("RemovePod failed: %v", err) t.Fatalf("RemovePod failed: %v", err)
} }
if _, err := cache.GetPod(tt.pod); err == nil {
n = cache.nodes[nodeName] t.Errorf("pod was not deleted")
if n != nil { }
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) })
}
} }
} }
@ -783,12 +813,20 @@ func TestEphemeralStorageResource(t *testing.T) {
func TestRemovePod(t *testing.T) { func TestRemovePod(t *testing.T) {
// Enable volumesOnNodeForBalancing to do balanced resource allocation // Enable volumesOnNodeForBalancing to do balanced resource allocation
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
nodeName := "node" basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct { tests := []struct {
nodes []*v1.Node
pod *v1.Pod pod *v1.Pod
wNodeInfo *schedulernodeinfo.NodeInfo wNodeInfo *schedulernodeinfo.NodeInfo
}{{ }{{
nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
},
},
pod: basePod, pod: basePod,
wNodeInfo: newNodeInfo( wNodeInfo: newNodeInfo(
&schedulernodeinfo.Resource{ &schedulernodeinfo.Resource{
@ -806,74 +844,75 @@ func TestRemovePod(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
if err := cache.AddPod(tt.pod); err != nil { nodeName := tt.pod.Spec.NodeName
t.Fatalf("AddPod failed: %v", err) cache := newSchedulerCache(time.Second, time.Second, nil)
} // Add pod succeeds even before adding the nodes.
n := cache.nodes[nodeName] if err := cache.AddPod(tt.pod); err != nil {
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo) t.Fatalf("AddPod failed: %v", err)
}
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
t.Error(err)
}
for _, n := range tt.nodes {
if err := cache.AddNode(n); err != nil {
t.Error(err)
}
}
if err := cache.RemovePod(tt.pod); err != nil { if err := cache.RemovePod(tt.pod); err != nil {
t.Fatalf("RemovePod failed: %v", err) t.Fatalf("RemovePod failed: %v", err)
} }
n = cache.nodes[nodeName] if _, err := cache.GetPod(tt.pod); err == nil {
if n != nil { t.Errorf("pod was not deleted")
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) }
}
// Node that owned the Pod should be at the head of the list.
if cache.headNode.info.Node().Name != nodeName {
t.Errorf("node %q is not at the head of the list", nodeName)
}
})
} }
} }
func TestForgetPod(t *testing.T) { func TestForgetPod(t *testing.T) {
nodeName := "node" nodeName := "node"
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct { pods := []*v1.Pod{basePod}
pods []*v1.Pod
}{{
pods: []*v1.Pod{basePod},
}}
now := time.Now() now := time.Now()
ttl := 10 * time.Second ttl := 10 * time.Second
for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil)
cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range pods {
for _, pod := range tt.pods { if err := assumeAndFinishBinding(cache, pod, now); err != nil {
if err := assumeAndFinishBinding(cache, pod, now); err != nil { t.Fatalf("assumePod failed: %v", err)
t.Fatalf("assumePod failed: %v", err)
}
isAssumed, err := cache.IsAssumedPod(pod)
if err != nil {
t.Fatalf("IsAssumedPod failed: %v.", err)
}
if !isAssumed {
t.Fatalf("Pod is expected to be assumed.")
}
assumedPod, err := cache.GetPod(pod)
if err != nil {
t.Fatalf("GetPod failed: %v.", err)
}
if assumedPod.Namespace != pod.Namespace {
t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
}
if assumedPod.Name != pod.Name {
t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
}
} }
for _, pod := range tt.pods { isAssumed, err := cache.IsAssumedPod(pod)
if err := cache.ForgetPod(pod); err != nil { if err != nil {
t.Fatalf("ForgetPod failed: %v", err) t.Fatalf("IsAssumedPod failed: %v.", err)
}
isAssumed, err := cache.IsAssumedPod(pod)
if err != nil {
t.Fatalf("IsAssumedPod failed: %v.", err)
}
if isAssumed {
t.Fatalf("Pod is expected to be unassumed.")
}
} }
cache.cleanupAssumedPods(now.Add(2 * ttl)) if !isAssumed {
if n := cache.nodes[nodeName]; n != nil { t.Fatalf("Pod is expected to be assumed.")
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) }
assumedPod, err := cache.GetPod(pod)
if err != nil {
t.Fatalf("GetPod failed: %v.", err)
}
if assumedPod.Namespace != pod.Namespace {
t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
}
if assumedPod.Name != pod.Name {
t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
}
}
for _, pod := range pods {
if err := cache.ForgetPod(pod); err != nil {
t.Fatalf("ForgetPod failed: %v", err)
}
if err := isForgottenFromCache(pod, cache); err != nil {
t.Errorf("pod %q: %v", pod.Name, err)
} }
} }
} }
@ -1051,78 +1090,105 @@ func TestNodeOperators(t *testing.T) {
}, },
} }
for _, test := range tests { for i, test := range tests {
expected := buildNodeInfo(test.node, test.pods) t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
node := test.node expected := buildNodeInfo(test.node, test.pods)
node := test.node
cache := newSchedulerCache(time.Second, time.Second, nil) cache := newSchedulerCache(time.Second, time.Second, nil)
cache.AddNode(node) if err := cache.AddNode(node); err != nil {
for _, pod := range test.pods { t.Fatal(err)
cache.AddPod(pod) }
} for _, pod := range test.pods {
if err := cache.AddPod(pod); err != nil {
t.Fatal(err)
}
}
// Case 1: the node was added into cache successfully. // Step 1: the node was added into cache successfully.
got, found := cache.nodes[node.Name] got, found := cache.nodes[node.Name]
if !found { if !found {
t.Errorf("Failed to find node %v in internalcache.", node.Name) t.Errorf("Failed to find node %v in internalcache.", node.Name)
} }
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
} }
// Generations are globally unique. We check in our unit tests that they are incremented correctly. // Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.SetGeneration(got.info.GetGeneration()) expected.SetGeneration(got.info.GetGeneration())
if !reflect.DeepEqual(got.info, expected) { if !reflect.DeepEqual(got.info, expected) {
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
} }
// Case 2: dump cached nodes successfully. // Step 2: dump cached nodes successfully.
cachedNodes := nodeinfosnapshot.NewEmptySnapshot() cachedNodes := nodeinfosnapshot.NewEmptySnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes) if err := cache.UpdateNodeInfoSnapshot(cachedNodes); err != nil {
newNode, found := cachedNodes.NodeInfoMap[node.Name] t.Error(err)
if !found || len(cachedNodes.NodeInfoMap) != 1 { }
t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) newNode, found := cachedNodes.NodeInfoMap[node.Name]
} if !found || len(cachedNodes.NodeInfoMap) != 1 {
expected.SetGeneration(newNode.GetGeneration()) t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
if !reflect.DeepEqual(newNode, expected) { }
t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) expected.SetGeneration(newNode.GetGeneration())
} if !reflect.DeepEqual(newNode, expected) {
t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
}
// Case 3: update node attribute successfully. // Step 3: update node attribute successfully.
node.Status.Allocatable[v1.ResourceMemory] = mem50m node.Status.Allocatable[v1.ResourceMemory] = mem50m
allocatableResource := expected.AllocatableResource() allocatableResource := expected.AllocatableResource()
newAllocatableResource := &allocatableResource newAllocatableResource := &allocatableResource
newAllocatableResource.Memory = mem50m.Value() newAllocatableResource.Memory = mem50m.Value()
expected.SetAllocatableResource(newAllocatableResource) expected.SetAllocatableResource(newAllocatableResource)
cache.UpdateNode(nil, node) if err := cache.UpdateNode(nil, node); err != nil {
got, found = cache.nodes[node.Name] t.Error(err)
if !found { }
t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name) got, found = cache.nodes[node.Name]
} if !found {
if got.info.GetGeneration() <= expected.GetGeneration() { t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name)
t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration()) }
} if got.info.GetGeneration() <= expected.GetGeneration() {
expected.SetGeneration(got.info.GetGeneration()) t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration())
}
expected.SetGeneration(got.info.GetGeneration())
if !reflect.DeepEqual(got.info, expected) { if !reflect.DeepEqual(got.info, expected) {
t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
} }
// Check nodeTree after update // Check nodeTree after update
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name { if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name) t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
} }
// Case 4: the node can not be removed if pods is not empty. // Step 4: the node can be removed even if it still has pods.
cache.RemoveNode(node) if err := cache.RemoveNode(node); err != nil {
if _, found := cache.nodes[node.Name]; !found { t.Error(err)
t.Errorf("The node %v should not be removed if pods is not empty.", node.Name) }
} if _, err := cache.GetNodeInfo(node.Name); err == nil {
// Check nodeTree after remove. The node should be removed from the nodeTree even if there are t.Errorf("The node %v should be removed.", node.Name)
// still pods on it. }
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { // Check node is removed from nodeTree as well.
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
} t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
// Pods are still in the pods cache.
for _, p := range test.pods {
if _, err := cache.GetPod(p); err != nil {
t.Error(err)
}
}
// Step 5: removing pods for the removed node still succeeds.
for _, p := range test.pods {
if err := cache.RemovePod(p); err != nil {
t.Error(err)
}
if _, err := cache.GetPod(p); err == nil {
t.Errorf("pod %q still in cache", p.Name)
}
}
})
} }
} }
@ -1591,3 +1657,15 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
} }
return cache return cache
} }
func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
if assumed, err := c.IsAssumedPod(p); err != nil {
return err
} else if assumed {
return errors.New("still assumed")
}
if _, err := c.GetPod(p); err == nil {
return errors.New("still in cache")
}
return nil
}

View File

@ -632,23 +632,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
return nil return nil
} }
// RemoveNode removes the overall information about the node.
func (n *NodeInfo) RemoveNode(node *v1.Node) error {
// We don't remove NodeInfo for because there can still be some pods on this node -
// this is because notifications about pods are delivered in a different watch,
// and thus can potentially be observed later, even though they happened before
// node removal. This is handled correctly in cache.go file.
n.node = nil
n.allocatableResource = &Resource{}
n.taints, n.taintsErr = nil, nil
n.memoryPressureCondition = v1.ConditionUnknown
n.diskPressureCondition = v1.ConditionUnknown
n.pidPressureCondition = v1.ConditionUnknown
n.imageStates = make(map[string]*ImageStateSummary)
n.generation = nextGeneration()
return nil
}
// FilterOutPods receives a list of pods and filters out those whose node names // FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
// //