From eba4d50c150d6083474f31bce197bb79f08f24fd Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 29 May 2022 10:28:15 +0000 Subject: [PATCH] Update cached Pod to make sure the Pod's status is up-to-date --- pkg/scheduler/internal/cache/cache.go | 13 +++-- pkg/scheduler/internal/cache/cache_test.go | 62 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index de7c7f07fcf..1bd5f712848 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -506,16 +506,15 @@ func (cache *cacheImpl) AddPod(pod *v1.Pod) error { currState, ok := cache.podStates[key] switch { case ok && cache.assumedPods.Has(key): + // When assuming, we've already added the Pod to cache, + // Just update here to make sure the Pod's status is up-to-date. + if err = cache.updatePod(currState.pod, pod); err != nil { + klog.ErrorS(err, "Error occurred while updating pod") + } if currState.pod.Spec.NodeName != pod.Spec.NodeName { // The pod was added to a different node than it was assumed to. klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) - if err = cache.updatePod(currState.pod, pod); err != nil { - klog.ErrorS(err, "Error occurred while updating pod") - } - } else { - delete(cache.assumedPods, key) - cache.podStates[key].deadline = nil - cache.podStates[key].pod = pod + return nil } case !ok: // Pod was expired. We should add it back. diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index a523ea24297..8b74b9eca47 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -448,6 +448,68 @@ func TestDump(t *testing.T) { } } +// TestAddPodAlwaysUpdatePodInfoInNodeInfo tests that AddPod method always updates PodInfo in NodeInfo, +// even when the Pod is assumed one. +func TestAddPodAlwaysUpdatesPodInfoInNodeInfo(t *testing.T) { + ttl := 10 * time.Second + now := time.Now() + p1 := makeBasePod(t, "node1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}) + + p2 := p1.DeepCopy() + p2.Status.Conditions = append(p1.Status.Conditions, v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }) + + tests := []struct { + podsToAssume []*v1.Pod + podsToAddAfterAssume []*v1.Pod + nodeInfo map[string]*framework.NodeInfo + }{ + { + podsToAssume: []*v1.Pod{p1}, + podsToAddAfterAssume: []*v1.Pod{p2}, + nodeInfo: map[string]*framework.NodeInfo{ + "node1": newNodeInfo( + &framework.Resource{ + MilliCPU: 100, + Memory: 500, + }, + &framework.Resource{ + MilliCPU: 100, + Memory: 500, + }, + []*v1.Pod{p2}, + newHostPortInfoBuilder().add("TCP", "0.0.0.0", 80).build(), + make(map[string]*framework.ImageStateSummary), + ), + }, + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + cache := newCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAddAfterAssume { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + for nodeName, expected := range tt.nodeInfo { + n := cache.nodes[nodeName] + if err := deepEqualWithoutGeneration(n, expected); err != nil { + t.Errorf("node %q: %v", nodeName, err) + } + } + }) + } +} + // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod. func TestAddPodWillReplaceAssumed(t *testing.T) { now := time.Now()