Update cached Pod to make sure the Pod's status is up-to-date

This commit is contained in:
Kensei Nakada 2022-05-29 10:28:15 +00:00
parent e7192a4955
commit eba4d50c15
2 changed files with 68 additions and 7 deletions

View File

@ -506,16 +506,15 @@ func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods.Has(key):
// When assuming, we've already added the Pod to cache,
// Just update here to make sure the Pod's status is up-to-date.
if err = cache.updatePod(currState.pod, pod); err != nil {
klog.ErrorS(err, "Error occurred while updating pod")
}
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
if err = cache.updatePod(currState.pod, pod); err != nil {
klog.ErrorS(err, "Error occurred while updating pod")
}
} else {
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
return nil
}
case !ok:
// Pod was expired. We should add it back.

View File

@ -448,6 +448,68 @@ func TestDump(t *testing.T) {
}
}
// TestAddPodAlwaysUpdatePodInfoInNodeInfo tests that AddPod method always updates PodInfo in NodeInfo,
// even when the Pod is assumed one.
func TestAddPodAlwaysUpdatesPodInfoInNodeInfo(t *testing.T) {
ttl := 10 * time.Second
now := time.Now()
p1 := makeBasePod(t, "node1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
p2 := p1.DeepCopy()
p2.Status.Conditions = append(p1.Status.Conditions, v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
})
tests := []struct {
podsToAssume []*v1.Pod
podsToAddAfterAssume []*v1.Pod
nodeInfo map[string]*framework.NodeInfo
}{
{
podsToAssume: []*v1.Pod{p1},
podsToAddAfterAssume: []*v1.Pod{p2},
nodeInfo: map[string]*framework.NodeInfo{
"node1": newNodeInfo(
&framework.Resource{
MilliCPU: 100,
Memory: 500,
},
&framework.Resource{
MilliCPU: 100,
Memory: 500,
},
[]*v1.Pod{p2},
newHostPortInfoBuilder().add("TCP", "0.0.0.0", 80).build(),
make(map[string]*framework.ImageStateSummary),
),
},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
cache := newCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAddAfterAssume {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
for nodeName, expected := range tt.nodeInfo {
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, expected); err != nil {
t.Errorf("node %q: %v", nodeName, err)
}
}
})
}
}
// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
func TestAddPodWillReplaceAssumed(t *testing.T) {
now := time.Now()