From 7c3dd0eb7b14f59cb8c77a183436773309800a19 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Sat, 29 Jan 2022 22:32:48 +0900 Subject: [PATCH 1/2] Add an e2e test for updating a static pod while it restarts --- test/e2e_node/mirror_pod_grace_period_test.go | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/e2e_node/mirror_pod_grace_period_test.go b/test/e2e_node/mirror_pod_grace_period_test.go index 26710a1c1e1..51c5b31b0d0 100644 --- a/test/e2e_node/mirror_pod_grace_period_test.go +++ b/test/e2e_node/mirror_pod_grace_period_test.go @@ -100,6 +100,35 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() { framework.ExpectEqual(pod.Spec.Containers[0].Image, image) }) + ginkgo.It("should update a static pod when the static pod is updated multiple times during the graceful termination period [NodeConformance]", func() { + ginkgo.By("get mirror pod uid") + pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + uid := pod.UID + + ginkgo.By("update the pod manifest multiple times during the graceful termination period") + for i := 0; i < 300; i++ { + err = createStaticPod(podPath, staticPodName, ns, + fmt.Sprintf("image-%d", i), v1.RestartPolicyAlways) + framework.ExpectNoError(err) + time.Sleep(100 * time.Millisecond) + } + image := imageutils.GetPauseImageName() + err = createStaticPod(podPath, staticPodName, ns, image, v1.RestartPolicyAlways) + framework.ExpectNoError(err) + + ginkgo.By("wait for the mirror pod to be updated") + gomega.Eventually(func() error { + return checkMirrorPodRecreatedAndRunning(f.ClientSet, mirrorPodName, ns, uid) + }, 2*time.Minute, time.Second*4).Should(gomega.BeNil()) + + ginkgo.By("check the mirror pod container image is updated") + pod, err = f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(len(pod.Spec.Containers), 1) + framework.ExpectEqual(pod.Spec.Containers[0].Image, image) + }) + ginkgo.AfterEach(func() { ginkgo.By("delete the static pod") err := deleteStaticPod(podPath, staticPodName, ns) From 3ce5c944a85879dd6a4b3d69d5deff9216872405 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Mon, 31 Jan 2022 17:24:47 +0900 Subject: [PATCH 2/2] kubelet: Clean up a static pod that has been terminated before starting - Allow a podWorker to start if it is blocked by a pod that has been terminated before starting - When a pod can't start AND has already been terminated, exit cleanly - Add a unit test that exercises race conditions in pod workers --- pkg/kubelet/pod_workers.go | 131 +++-- pkg/kubelet/pod_workers_test.go | 836 +++++++++++++++++++++++++++++++- 2 files changed, 909 insertions(+), 58 deletions(-) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index e5678d3c1da..d8dfaa4034f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -392,6 +392,11 @@ type podWorkers struct { syncTerminatingPodFn syncTerminatingPodFnType syncTerminatedPodFn syncTerminatedPodFnType + // workerChannelFn is exposed for testing to allow unit tests to impose delays + // in channel communication. The function is invoked once each time a new worker + // goroutine starts. + workerChannelFn func(uid types.UID, in chan podWork) (out <-chan podWork) + // The EventRecorder to use recorder record.EventRecorder @@ -699,9 +704,8 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { } // start the pod worker goroutine if it doesn't exist - var podUpdates chan podWork - var exists bool - if podUpdates, exists = p.podUpdates[uid]; !exists { + podUpdates, exists := p.podUpdates[uid] + if !exists { // We need to have a buffer here, because checkForUpdates() method that // puts an update into channel is called from the same goroutine where // the channel is consumed. However, it is guaranteed that in such case @@ -715,13 +719,21 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) } + // allow testing of delays in the pod update channel + var outCh <-chan podWork + if p.workerChannelFn != nil { + outCh = p.workerChannelFn(uid, podUpdates) + } else { + outCh = podUpdates + } + // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. go func() { defer runtime.HandleCrash() - p.managePodLoop(podUpdates) + p.managePodLoop(outCh) }() } @@ -785,28 +797,31 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options * } // allowPodStart tries to start the pod and returns true if allowed, otherwise -// it requeues the pod and returns false. -func (p *podWorkers) allowPodStart(pod *v1.Pod) bool { +// it requeues the pod and returns false. If the pod will never be able to start +// because data is missing, or the pod was terminated before start, canEverStart +// is false. +func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) { if !kubetypes.IsStaticPod(pod) { - // TBD: Do we want to allow non-static pods with the same full name? + // TODO: Do we want to allow non-static pods with the same full name? // Note that it may disable the force deletion of pods. - return true + return true, true } p.podLock.Lock() defer p.podLock.Unlock() status, ok := p.podSyncStatuses[pod.UID] if !ok { - klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID) - p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) - status.working = false - return false + klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID) + return false, false + } + if status.IsTerminationRequested() { + return false, false } if !p.allowStaticPodStart(status.fullname, pod.UID) { p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) status.working = false - return false + return false, true } - return true + return true, true } // allowStaticPodStart tries to start the static pod and returns true if @@ -819,9 +834,12 @@ func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { } waitingPods := p.waitingToStartStaticPodsByFullname[fullname] + // TODO: This is O(N) with respect to the number of updates to static pods + // with overlapping full names, and ideally would be O(1). for i, waitingUID := range waitingPods { // has pod already terminated or been deleted? - if _, ok := p.podSyncStatuses[waitingUID]; !ok { + status, ok := p.podSyncStatuses[waitingUID] + if !ok || status.IsTerminationRequested() || status.IsTerminated() { continue } // another pod is next in line @@ -847,8 +865,20 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { var podStarted bool for update := range podUpdates { pod := update.Options.Pod + + // Decide whether to start the pod. If the pod was terminated prior to the pod being allowed + // to start, we have to clean it up and then exit the pod worker loop. if !podStarted { - if !p.allowPodStart(pod) { + canStart, canEverStart := p.allowPodStart(pod) + if !canEverStart { + p.completeUnstartedTerminated(pod) + if start := update.Options.StartTime; !start.IsZero() { + metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) + } + klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + return + } + if !canStart { klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID) continue } @@ -1027,12 +1057,7 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { } } - ch, ok := p.podUpdates[pod.UID] - if ok { - close(ch) - } - delete(p.podUpdates, pod.UID) - delete(p.lastUndeliveredWorkUpdate, pod.UID) + p.cleanupPodUpdates(pod.UID) } // completeTerminated is invoked after syncTerminatedPod completes successfully and means we @@ -1043,12 +1068,7 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { klog.V(4).InfoS("Pod is complete and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID) - ch, ok := p.podUpdates[pod.UID] - if ok { - close(ch) - } - delete(p.podUpdates, pod.UID) - delete(p.lastUndeliveredWorkUpdate, pod.UID) + p.cleanupPodUpdates(pod.UID) if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { @@ -1066,6 +1086,33 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { } } +// completeUnstartedTerminated is invoked if a pod that has never been started receives a termination +// signal before it can be started. +func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) { + p.podLock.Lock() + defer p.podLock.Unlock() + + klog.V(4).InfoS("Pod never started and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID) + + p.cleanupPodUpdates(pod.UID) + + if status, ok := p.podSyncStatuses[pod.UID]; ok { + if status.terminatingAt.IsZero() { + klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + if !status.terminatedAt.IsZero() { + klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + status.finished = true + status.working = false + status.terminatedAt = time.Now() + + if p.startedStaticPodsByFullname[status.fullname] == pod.UID { + delete(p.startedStaticPodsByFullname, status.fullname) + } + } +} + // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) { @@ -1150,10 +1197,10 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke return workers } -// removeTerminatedWorker cleans up and removes the worker status for a worker that -// has reached a terminal state of "finished" - has successfully exited -// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be recreated -// with the same UID. +// removeTerminatedWorker cleans up and removes the worker status for a worker +// that has reached a terminal state of "finished" - has successfully exited +// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be +// recreated with the same UID. func (p *podWorkers) removeTerminatedWorker(uid types.UID) { status, ok := p.podSyncStatuses[uid] if !ok { @@ -1162,11 +1209,6 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { return } - if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid { - klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid) - status.finished = true - } - if !status.finished { klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) return @@ -1178,8 +1220,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) } delete(p.podSyncStatuses, uid) - delete(p.podUpdates, uid) - delete(p.lastUndeliveredWorkUpdate, uid) + p.cleanupPodUpdates(uid) if p.startedStaticPodsByFullname[status.fullname] == uid { delete(p.startedStaticPodsByFullname, status.fullname) @@ -1230,3 +1271,15 @@ func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.K } } } + +// cleanupPodUpdates closes the podUpdates channel and removes it from +// podUpdates map so that the corresponding pod worker can stop. It also +// removes any undelivered work. This method must be called holding the +// pod lock. +func (p *podWorkers) cleanupPodUpdates(uid types.UID) { + if ch, ok := p.podUpdates[uid]; ok { + close(ch) + } + delete(p.podUpdates, uid) + delete(p.lastUndeliveredWorkUpdate, uid) +} diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 4028c06c292..afde2b0d789 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -18,17 +18,20 @@ package kubelet import ( "context" + "flag" "reflect" "strconv" "sync" "testing" "time" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -164,6 +167,22 @@ func newStaticPod(uid, name string) *v1.Pod { } } +func newNamedPod(uid, namespace, name string, isStatic bool) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Namespace: namespace, + Name: name, + }, + } + if isStatic { + pod.Annotations = map[string]string{ + kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource, + } + } + return pod +} + // syncPodRecord is a record of a sync pod call type syncPodRecord struct { name string @@ -172,12 +191,63 @@ type syncPodRecord struct { terminated bool } +type FakeQueueItem struct { + UID types.UID + Delay time.Duration +} + +type fakeQueue struct { + lock sync.Mutex + queue []FakeQueueItem + currentStart int +} + +func (q *fakeQueue) Empty() bool { + q.lock.Lock() + defer q.lock.Unlock() + return (len(q.queue) - q.currentStart) == 0 +} + +func (q *fakeQueue) Items() []FakeQueueItem { + q.lock.Lock() + defer q.lock.Unlock() + return append(make([]FakeQueueItem, 0, len(q.queue)), q.queue...) +} + +func (q *fakeQueue) Set() sets.String { + q.lock.Lock() + defer q.lock.Unlock() + work := sets.NewString() + for _, item := range q.queue[q.currentStart:] { + work.Insert(string(item.UID)) + } + return work +} + +func (q *fakeQueue) Enqueue(uid types.UID, delay time.Duration) { + q.lock.Lock() + defer q.lock.Unlock() + q.queue = append(q.queue, FakeQueueItem{UID: uid, Delay: delay}) +} + +func (q *fakeQueue) GetWork() []types.UID { + q.lock.Lock() + defer q.lock.Unlock() + work := make([]types.UID, 0, len(q.queue)-q.currentStart) + for _, item := range q.queue[q.currentStart:] { + work = append(work, item.UID) + } + q.currentStart = len(q.queue) + return work +} + func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { lock := sync.Mutex{} processed := make(map[types.UID][]syncPodRecord) fakeRecorder := &record.FakeRecorder{} fakeRuntime := &containertest.FakeRuntime{} fakeCache := containertest.NewFakeCache(fakeRuntime) + fakeQueue := &fakeQueue{} w := newPodWorkers( func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { func() { @@ -215,9 +285,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { return nil }, fakeRecorder, - queue.NewBasicWorkQueue(&clock.RealClock{}), - time.Second, + fakeQueue, time.Second, + time.Millisecond, fakeCache, ) return w.(*podWorkers), processed @@ -241,6 +311,31 @@ func drainWorkers(podWorkers *podWorkers, numPods int) { } } +func drainWorkersExcept(podWorkers *podWorkers, uids ...types.UID) { + set := sets.NewString() + for _, uid := range uids { + set.Insert(string(uid)) + } + for { + stillWorking := false + podWorkers.podLock.Lock() + for k, v := range podWorkers.podSyncStatuses { + if set.Has(string(k)) { + continue + } + if v.working { + stillWorking = true + break + } + } + podWorkers.podLock.Unlock() + if !stillWorking { + break + } + time.Sleep(50 * time.Millisecond) + } +} + func drainAllWorkers(podWorkers *podWorkers) { for { stillWorking := false @@ -427,6 +522,473 @@ func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) { } } +func newUIDSet(uids ...types.UID) sets.String { + set := sets.NewString() + for _, uid := range uids { + set.Insert(string(uid)) + } + return set +} + +func init() { + klog.InitFlags(nil) + flag.Lookup("v").Value.Set("5") +} + +func TestStaticPodExclusion(t *testing.T) { + podWorkers, processed := createPodWorkers() + var channels WorkChannel + podWorkers.workerChannelFn = channels.Intercept + + testPod := newNamedPod("2-static", "test1", "pod1", true) + if !kubetypes.IsStaticPod(testPod) { + t.Fatalf("unable to test static pod") + } + + // start two pods with the same name, one static, one apiserver + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1-normal", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("2-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe both pods running + pod1 := podWorkers.podSyncStatuses[types.UID("1-normal")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + pod2 := podWorkers.podSyncStatuses[types.UID("2-static")] + if pod2.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod2) + } + + if len(processed) != 2 { + t.Fatalf("unexpected synced pods: %#v", processed) + } + if e, a := + []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}}, + processed[types.UID("2-static")]; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(e, a)) + } + if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + + // attempt to start a second and third static pod, which should not start + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("3-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("4-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe both pods running but last pod shouldn't have synced + pod1 = podWorkers.podSyncStatuses[types.UID("1-normal")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + pod2 = podWorkers.podSyncStatuses[types.UID("2-static")] + if pod2.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod2) + } + pod3 := podWorkers.podSyncStatuses[types.UID("3-static")] + if pod3.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod3) + } + pod4 := podWorkers.podSyncStatuses[types.UID("4-static")] + if pod4.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod4) + } + + if len(processed) != 2 { + t.Fatalf("unexpected synced pods: %#v", processed) + } + if expected, actual := + []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}}, + processed[types.UID("2-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + if expected, actual := + []syncPodRecord(nil), + processed[types.UID("3-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + if expected, actual := + []syncPodRecord(nil), + processed[types.UID("4-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + // verify all are enqueued + if e, a := sets.NewString("1-normal", "2-static", "4-static", "3-static"), podWorkers.workQueue.(*fakeQueue).Set(); !e.Equal(a) { + t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a)) + } + + // send a basic update for 3-static + podWorkers.workQueue.GetWork() + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("3-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // 3-static should not be started because 2-static is still running + if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + // the queue should include a single item for 3-static (indicating we need to retry later) + if e, a := sets.NewString("3-static"), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a)) + } + + // mark 3-static as deleted while 2-static is still running + podWorkers.workQueue.GetWork() + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("3-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + drainAllWorkers(podWorkers) + + // should observe 3-static as terminated because it has never started, but other state should be a no-op + pod3 = podWorkers.podSyncStatuses[types.UID("3-static")] + if !pod3.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod3) + } + // the queue should be empty because the worker is now done + if e, a := sets.NewString(), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a)) + } + // 2-static is still running + if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + // 3-static and 4-static are both still queued + if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + + // terminate 2-static + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("2-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + drainAllWorkers(podWorkers) + + // should observe 2-static as terminated, and 2-static should no longer be reported as the started static pod + pod2 = podWorkers.podSyncStatuses[types.UID("2-static")] + if !pod2.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod3) + } + if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + + // simulate a periodic event from the work queue for 4-static + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("4-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // 4-static should be started because 3-static has already terminated + pod4 = podWorkers.podSyncStatuses[types.UID("4-static")] + if pod4.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod3) + } + if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + + // initiate a sync with all pods remaining + state := podWorkers.SyncKnownPods([]*v1.Pod{ + newNamedPod("1-normal", "test1", "pod1", false), + newNamedPod("2-static", "test1", "pod1", true), + newNamedPod("3-static", "test1", "pod1", true), + newNamedPod("4-static", "test1", "pod1", true), + }) + drainAllWorkers(podWorkers) + + // 2-static and 3-static should both be listed as terminated + if e, a := map[types.UID]PodWorkerState{ + "1-normal": SyncPod, + "2-static": TerminatedPod, + "3-static": TerminatedPod, + "4-static": SyncPod, + }, state; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a)) + } + // 3-static is still in the config, it should still be in our status + if status, ok := podWorkers.podSyncStatuses["3-static"]; !ok || status.terminatedAt.IsZero() || !status.finished || status.working { + t.Fatalf("unexpected post termination status: %#v", status) + } + + // initiate a sync with 3-static removed + state = podWorkers.SyncKnownPods([]*v1.Pod{ + newNamedPod("1-normal", "test1", "pod1", false), + newNamedPod("2-static", "test1", "pod1", true), + newNamedPod("4-static", "test1", "pod1", true), + }) + drainAllWorkers(podWorkers) + + // expect sync to put 3-static into final state and remove the status + if e, a := map[types.UID]PodWorkerState{ + "1-normal": SyncPod, + "2-static": TerminatedPod, + "3-static": TerminatedPod, + "4-static": SyncPod, + }, state; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a)) + } + if status, ok := podWorkers.podSyncStatuses["3-static"]; ok { + t.Fatalf("unexpected post termination status: %#v", status) + } + + // start a static pod, kill it, then add another one, but ensure the pod worker + // for pod 5 doesn't see the kill event (so it remains waiting to start) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("5-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + channels.Channel("5-static").Hold() + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("5-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("6-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainWorkersExcept(podWorkers, "5-static") + + // pod 5 should have termination requested, but hasn't cleaned up + pod5 := podWorkers.podSyncStatuses[types.UID("5-static")] + if !pod5.IsTerminationRequested() || pod5.IsTerminated() { + t.Fatalf("unexpected status for pod 5: %#v", pod5) + } + if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + if e, a := map[string][]types.UID{"pod1_test1": {"5-static", "6-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + + // terminate 4-static and wake 6-static + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("4-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + drainWorkersExcept(podWorkers, "5-static") + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("6-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainWorkersExcept(podWorkers, "5-static") + + // 5-static should still be waiting, 6-static should have started and synced + pod5 = podWorkers.podSyncStatuses[types.UID("5-static")] + if !pod5.IsTerminationRequested() || pod5.IsTerminated() { + t.Fatalf("unexpected status for pod 5: %#v", pod5) + } + if e, a := map[string]types.UID{"pod1_test1": "6-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + // no static pods shoud be waiting + if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + // prove 6-static synced + if expected, actual := + []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}}, + processed[types.UID("6-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + + // ensure 5-static exits when we deliver the event out of order + channels.Channel("5-static").Release() + drainAllWorkers(podWorkers) + pod5 = podWorkers.podSyncStatuses[types.UID("5-static")] + if !pod5.IsTerminated() { + t.Fatalf("unexpected status for pod 5: %#v", pod5) + } + + // start three more static pods, kill the previous static pod blocking start, + // and simulate the second pod of three (8) getting to run first + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("7-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("8-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("9-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("6-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + drainAllWorkers(podWorkers) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("8-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // 7 and 8 should both be waiting still with no syncs + if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a)) + } + // only 7-static can start now, but it hasn't received an event + if e, a := map[string][]types.UID{"pod1_test1": {"7-static", "8-static", "9-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) { + t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a)) + } + // none of the new pods have synced + if expected, actual := + []syncPodRecord(nil), + processed[types.UID("7-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + if expected, actual := + []syncPodRecord(nil), + processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + if expected, actual := + []syncPodRecord(nil), + processed[types.UID("9-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } + + // terminate 7-static and wake 8-static + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("7-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodKill, + }) + drainAllWorkers(podWorkers) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("8-static", "test1", "pod1", true), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // 8 should have synced + if expected, actual := + []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}}, + processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual)) + } +} + +type WorkChannelItem struct { + out chan podWork + lock sync.Mutex + pause bool + queue []podWork +} + +func (item *WorkChannelItem) Handle(work podWork) { + item.lock.Lock() + defer item.lock.Unlock() + if item.pause { + item.queue = append(item.queue, work) + return + } + item.out <- work +} + +func (item *WorkChannelItem) Hold() { + item.lock.Lock() + defer item.lock.Unlock() + item.pause = true +} + +func (item *WorkChannelItem) Close() { + item.lock.Lock() + defer item.lock.Unlock() + if item.out != nil { + close(item.out) + item.out = nil + } +} + +// Release blocks until all work is passed on the chain +func (item *WorkChannelItem) Release() { + item.lock.Lock() + defer item.lock.Unlock() + item.pause = false + for _, work := range item.queue { + item.out <- work + } + item.queue = nil +} + +// WorkChannel intercepts podWork channels between the pod worker and its child +// goroutines and allows tests to pause or release the flow of podWork to the +// workers. +type WorkChannel struct { + lock sync.Mutex + channels map[types.UID]*WorkChannelItem +} + +func (w *WorkChannel) Channel(uid types.UID) *WorkChannelItem { + w.lock.Lock() + defer w.lock.Unlock() + if w.channels == nil { + w.channels = make(map[types.UID]*WorkChannelItem) + } + channel, ok := w.channels[uid] + if !ok { + channel = &WorkChannelItem{ + out: make(chan podWork, 1), + } + w.channels[uid] = channel + } + return channel +} + +func (w *WorkChannel) Intercept(uid types.UID, ch chan podWork) (outCh <-chan podWork) { + channel := w.Channel(uid) + w.lock.Lock() + + defer w.lock.Unlock() + go func() { + defer func() { + channel.Close() + w.lock.Lock() + defer w.lock.Unlock() + delete(w.channels, uid) + }() + for w := range ch { + channel.Handle(w) + } + }() + return channel.out +} + func TestSyncKnownPods(t *testing.T) { podWorkers, _ := createPodWorkers() @@ -570,6 +1132,70 @@ func TestSyncKnownPods(t *testing.T) { } } +func Test_removeTerminatedWorker(t *testing.T) { + podUID := types.UID("pod-uid") + + testCases := []struct { + desc string + podSyncStatus *podSyncStatus + startedStaticPodsByFullname map[string]types.UID + waitingToStartStaticPodsByFullname map[string][]types.UID + removed bool + }{ + { + desc: "finished worker", + podSyncStatus: &podSyncStatus{ + finished: true, + }, + removed: true, + }, + { + desc: "waiting to start worker because of another started pod with the same fullname", + podSyncStatus: &podSyncStatus{ + finished: false, + fullname: "fake-fullname", + }, + startedStaticPodsByFullname: map[string]types.UID{ + "fake-fullname": "another-pod-uid", + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "fake-fullname": {podUID}, + }, + removed: false, + }, + { + desc: "not yet started worker", + podSyncStatus: &podSyncStatus{ + finished: false, + fullname: "fake-fullname", + }, + startedStaticPodsByFullname: make(map[string]types.UID), + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "fake-fullname": {podUID}, + }, + removed: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + podWorkers, _ := createPodWorkers() + podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus + podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname + podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname + + podWorkers.removeTerminatedWorker(podUID) + _, exists := podWorkers.podSyncStatuses[podUID] + if tc.removed && exists { + t.Errorf("Expected pod worker to be removed") + } + if !tc.removed && !exists { + t.Errorf("Expected pod worker to not be removed") + } + }) + } +} + type simpleFakeKubelet struct { pod *v1.Pod mirrorPod *v1.Pod @@ -702,10 +1328,14 @@ func Test_allowPodStart(t *testing.T) { podSyncStatuses map[types.UID]*podSyncStatus startedStaticPodsByFullname map[string]types.UID waitingToStartStaticPodsByFullname map[string][]types.UID - allowed bool + + expectedStartedStaticPodsByFullname map[string]types.UID + expectedWaitingToStartStaticPodsByFullname map[string][]types.UID + allowed bool + allowedEver bool }{ { - // TBD: Do we want to allow non-static pods with the same full name? + // TODO: Do we want to allow non-static pods with the same full name? // Note that it may disable the force deletion of pods. desc: "non-static pod", pod: newPod("uid-0", "test"), @@ -717,10 +1347,11 @@ func Test_allowPodStart(t *testing.T) { fullname: "test_", }, }, - allowed: true, + allowed: true, + allowedEver: true, }, { - // TBD: Do we want to allow a non-static pod with the same full name + // TODO: Do we want to allow a non-static pod with the same full name // as the started static pod? desc: "non-static pod when there is a started static pod with the same full name", pod: newPod("uid-0", "test"), @@ -735,10 +1366,14 @@ func Test_allowPodStart(t *testing.T) { startedStaticPodsByFullname: map[string]types.UID{ "test_": types.UID("uid-1"), }, - allowed: true, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "test_": types.UID("uid-1"), + }, + allowed: true, + allowedEver: true, }, { - // TBD: Do we want to allow a static pod with the same full name as the + // TODO: Do we want to allow a static pod with the same full name as the // started non-static pod? desc: "static pod when there is a started non-static pod with the same full name", pod: newPod("uid-0", "test"), @@ -750,10 +1385,8 @@ func Test_allowPodStart(t *testing.T) { fullname: "test_", }, }, - startedStaticPodsByFullname: map[string]types.UID{ - "test_": types.UID("uid-1"), - }, - allowed: true, + allowed: true, + allowedEver: true, }, { desc: "static pod when there are no started static pods with the same full name", @@ -769,7 +1402,12 @@ func Test_allowPodStart(t *testing.T) { startedStaticPodsByFullname: map[string]types.UID{ "bar_": types.UID("uid-1"), }, - allowed: true, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + "bar_": types.UID("uid-1"), + }, + allowed: true, + allowedEver: true, }, { desc: "static pod when there is a started static pod with the same full name", @@ -785,7 +1423,11 @@ func Test_allowPodStart(t *testing.T) { startedStaticPodsByFullname: map[string]types.UID{ "foo_": types.UID("uid-1"), }, - allowed: false, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-1"), + }, + allowed: false, + allowedEver: true, }, { desc: "static pod if the static pod has already started", @@ -798,7 +1440,11 @@ func Test_allowPodStart(t *testing.T) { startedStaticPodsByFullname: map[string]types.UID{ "foo_": types.UID("uid-0"), }, - allowed: true, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + allowed: true, + allowedEver: true, }, { desc: "static pod if the static pod is the first pod waiting to start", @@ -813,7 +1459,12 @@ func Test_allowPodStart(t *testing.T) { types.UID("uid-0"), }, }, - allowed: true, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + expectedWaitingToStartStaticPodsByFullname: make(map[string][]types.UID), + allowed: true, + allowedEver: true, }, { desc: "static pod if the static pod is not the first pod waiting to start", @@ -832,7 +1483,15 @@ func Test_allowPodStart(t *testing.T) { types.UID("uid-0"), }, }, - allowed: false, + expectedStartedStaticPodsByFullname: make(map[string]types.UID), + expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + types.UID("uid-0"), + }, + }, + allowed: false, + allowedEver: true, }, { desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod", @@ -854,7 +1513,113 @@ func Test_allowPodStart(t *testing.T) { types.UID("uid-1"), }, }, - allowed: true, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + }, + }, + allowed: true, + allowedEver: true, + }, + { + desc: "static pod if the static pod is the first pod that is not termination requested and waiting to start", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + "uid-2": { + fullname: "foo_", + terminatingAt: time.Now(), + }, + "uid-3": { + fullname: "foo_", + terminatedAt: time.Now(), + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-2"), + types.UID("uid-3"), + types.UID("uid-0"), + types.UID("uid-1"), + }, + }, + expectedStartedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + }, + }, + allowed: true, + allowedEver: true, + }, + { + desc: "static pod if there is no sync status for the pod should be denied", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-1": { + fullname: "foo_", + }, + "uid-2": { + fullname: "foo_", + terminatingAt: time.Now(), + }, + "uid-3": { + fullname: "foo_", + terminatedAt: time.Now(), + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + }, + }, + expectedStartedStaticPodsByFullname: map[string]types.UID{}, + expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + }, + }, + allowed: false, + allowedEver: false, + }, + { + desc: "static pod if the static pod is terminated should not be allowed", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + terminatingAt: time.Now(), + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-2"), + types.UID("uid-3"), + types.UID("uid-0"), + types.UID("uid-1"), + }, + }, + expectedStartedStaticPodsByFullname: map[string]types.UID{}, + expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-2"), + types.UID("uid-3"), + types.UID("uid-0"), + types.UID("uid-1"), + }, + }, + allowed: false, + allowedEver: false, }, } @@ -870,13 +1635,46 @@ func Test_allowPodStart(t *testing.T) { if tc.waitingToStartStaticPodsByFullname != nil { podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname } - if podWorkers.allowPodStart(tc.pod) != tc.allowed { + allowed, allowedEver := podWorkers.allowPodStart(tc.pod) + if allowed != tc.allowed { if tc.allowed { t.Errorf("Pod should be allowed") } else { t.Errorf("Pod should not be allowed") } } + + if allowedEver != tc.allowedEver { + if tc.allowedEver { + t.Errorf("Pod should be allowed ever") + } else { + t.Errorf("Pod should not be allowed ever") + } + } + + // if maps are neither nil nor empty + if len(podWorkers.startedStaticPodsByFullname) != 0 || + len(podWorkers.startedStaticPodsByFullname) != len(tc.expectedStartedStaticPodsByFullname) { + if !reflect.DeepEqual( + podWorkers.startedStaticPodsByFullname, + tc.expectedStartedStaticPodsByFullname) { + t.Errorf("startedStaticPodsByFullname: expected %v, got %v", + tc.expectedStartedStaticPodsByFullname, + podWorkers.startedStaticPodsByFullname) + } + } + + // if maps are neither nil nor empty + if len(podWorkers.waitingToStartStaticPodsByFullname) != 0 || + len(podWorkers.waitingToStartStaticPodsByFullname) != len(tc.expectedWaitingToStartStaticPodsByFullname) { + if !reflect.DeepEqual( + podWorkers.waitingToStartStaticPodsByFullname, + tc.expectedWaitingToStartStaticPodsByFullname) { + t.Errorf("waitingToStartStaticPodsByFullname: expected %v, got %v", + tc.expectedWaitingToStartStaticPodsByFullname, + podWorkers.waitingToStartStaticPodsByFullname) + } + } }) } }