diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 44deeaaf887..1601d670685 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -10,7 +10,9 @@ go_library( "container_manager_stub.go", "container_manager_unsupported.go", "container_manager_windows.go", + "fake_container_manager.go", "fake_internal_container_lifecycle.go", + "fake_pod_container_manager.go", "helpers.go", "helpers_linux.go", "helpers_unsupported.go", diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go new file mode 100644 index 00000000000..eb01b1ab3f2 --- /dev/null +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -0,0 +1,202 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "sync" + + v1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/api/resource" + internalapi "k8s.io/cri-api/pkg/apis" + podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/config" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + "k8s.io/kubernetes/pkg/kubelet/status" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type FakeContainerManager struct { + sync.Mutex + CalledFunctions []string + PodContainerManager *FakePodContainerManager + shouldResetExtendedResourceCapacity bool +} + +var _ ContainerManager = &FakeContainerManager{} + +func NewFakeContainerManager() *FakeContainerManager { + return &FakeContainerManager{ + PodContainerManager: NewFakePodContainerManager(), + } +} + +func (cm *FakeContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "Start") + return nil +} + +func (cm *FakeContainerManager) SystemCgroupsLimit() v1.ResourceList { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "SystemCgroupsLimit") + return v1.ResourceList{} +} + +func (cm *FakeContainerManager) GetNodeConfig() NodeConfig { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetNodeConfig") + return NodeConfig{} +} + +func (cm *FakeContainerManager) GetMountedSubsystems() *CgroupSubsystems { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetMountedSubsystems") + return &CgroupSubsystems{} +} + +func (cm *FakeContainerManager) GetQOSContainersInfo() QOSContainersInfo { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "QOSContainersInfo") + return QOSContainersInfo{} +} + +func (cm *FakeContainerManager) UpdateQOSCgroups() error { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "UpdateQOSCgroups") + return nil +} + +func (cm *FakeContainerManager) Status() Status { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "Status") + return Status{} +} + +func (cm *FakeContainerManager) GetNodeAllocatableReservation() v1.ResourceList { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetNodeAllocatableReservation") + return nil +} + +func (cm *FakeContainerManager) GetCapacity() v1.ResourceList { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetCapacity") + c := v1.ResourceList{ + v1.ResourceEphemeralStorage: *resource.NewQuantity( + int64(0), + resource.BinarySI), + } + return c +} + +func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler") + return nil +} + +func (cm *FakeContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetDevicePluginResourceCapacity") + return nil, nil, []string{} +} + +func (cm *FakeContainerManager) NewPodContainerManager() PodContainerManager { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "PodContainerManager") + return cm.PodContainerManager +} + +func (cm *FakeContainerManager) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetResources") + return &kubecontainer.RunContainerOptions{}, nil +} + +func (cm *FakeContainerManager) UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "UpdatePluginResources") + return nil +} + +func (cm *FakeContainerManager) InternalContainerLifecycle() InternalContainerLifecycle { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "InternalContainerLifecycle") + return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()} +} + +func (cm *FakeContainerManager) GetPodCgroupRoot() string { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupRoot") + return "" +} + +func (cm *FakeContainerManager) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetDevices") + return nil +} + +func (cm *FakeContainerManager) ShouldResetExtendedResourceCapacity() bool { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "ShouldResetExtendedResourceCapacity") + return cm.shouldResetExtendedResourceCapacity +} + +func (cm *FakeContainerManager) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocateResourcesPodAdmitHandler") + return topologymanager.NewFakeManager() +} + +func (cm *FakeContainerManager) UpdateAllocatedDevices() { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "UpdateAllocatedDevices") + return +} + +func (cm *FakeContainerManager) GetCPUs(_, _ string) []int64 { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs") + return nil +} diff --git a/pkg/kubelet/cm/fake_pod_container_manager.go b/pkg/kubelet/cm/fake_pod_container_manager.go new file mode 100644 index 00000000000..cafae75f569 --- /dev/null +++ b/pkg/kubelet/cm/fake_pod_container_manager.go @@ -0,0 +1,106 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "reflect" + "sync" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +type FakePodContainerManager struct { + sync.Mutex + CalledFunctions []string + Cgroups map[types.UID]CgroupName +} + +var _ PodContainerManager = &FakePodContainerManager{} + +func NewFakePodContainerManager() *FakePodContainerManager { + return &FakePodContainerManager{ + Cgroups: make(map[types.UID]CgroupName), + } +} + +func (m *FakePodContainerManager) AddPodFromCgroups(pod *kubecontainer.Pod) { + m.Lock() + defer m.Unlock() + m.Cgroups[pod.ID] = []string{pod.Name} +} + +func (m *FakePodContainerManager) Exists(_ *v1.Pod) bool { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "Exists") + return true +} + +func (m *FakePodContainerManager) EnsureExists(_ *v1.Pod) error { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "EnsureExists") + return nil +} + +func (m *FakePodContainerManager) GetPodContainerName(_ *v1.Pod) (CgroupName, string) { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "GetPodContainerName") + return nil, "" +} + +func (m *FakePodContainerManager) Destroy(name CgroupName) error { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "Destroy") + for key, cgname := range m.Cgroups { + if reflect.DeepEqual(cgname, name) { + delete(m.Cgroups, key) + return nil + } + } + return nil +} + +func (m *FakePodContainerManager) ReduceCPULimits(_ CgroupName) error { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "ReduceCPULimits") + return nil +} + +func (m *FakePodContainerManager) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "GetAllPodsFromCgroups") + // return a copy for the race detector + grp := make(map[types.UID]CgroupName) + for key, value := range m.Cgroups { + grp[key] = value + } + return grp, nil +} + +func (m *FakePodContainerManager) IsPodCgroup(cgroupfs string) (bool, types.UID) { + m.Lock() + defer m.Unlock() + m.CalledFunctions = append(m.CalledFunctions, "IsPodCgroup") + return false, types.UID("") +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dfb266af922..7b82842f155 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1504,10 +1504,12 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error { return nil } - // If the pod is a static pod and its mirror pod is still gracefully terminating, - // we do not want to start the new static pod until the old static pod is gracefully terminated. + // If a pod is still gracefully terminating, then we do not want to + // take further action. This mitigates static pods and deleted pods + // from getting rerun prematurely or their cgroups being deleted before + // the runtime cleans up. podFullName := kubecontainer.GetPodFullName(pod) - if kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullName) { + if kl.podKiller.IsPodPendingTerminationByPodName(podFullName) { return fmt.Errorf("pod %q is pending termination", podFullName) } @@ -1775,10 +1777,6 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { return fmt.Errorf("pod not found") } podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod} - - if _, ok := kl.podManager.GetMirrorPodByPod(pod); ok { - kl.podKiller.MarkMirrorPodPendingTermination(pod) - } kl.podKiller.KillPod(&podPair) // We leave the volume/directory cleanup to the periodic cleanup routine. diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 79ce5f59525..6d915f419c6 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1033,7 +1033,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po func (kl *Kubelet) deleteOrphanedMirrorPods() { podFullNames := kl.podManager.GetOrphanedMirrorPodNames() for _, podFullname := range podFullNames { - if !kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullname) { + if !kl.podKiller.IsPodPendingTerminationByPodName(podFullname) { _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) if err != nil { klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err) @@ -1143,12 +1143,10 @@ type PodKiller interface { PerformPodKillingWork() // After Close() is called, this pod killer wouldn't accept any more pod killing requests Close() - // IsMirrorPodPendingTerminationByPodName checks whether the mirror pod for the given full pod name is pending termination - IsMirrorPodPendingTerminationByPodName(podFullname string) bool - // IsMirrorPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination - IsMirrorPodPendingTerminationByUID(uid types.UID) bool - // MarkMirrorPodPendingTermination marks the mirror pod entering grace period of termination - MarkMirrorPodPendingTermination(pod *v1.Pod) + // IsPodPendingTerminationByPodName checks whether any pod for the given full pod name is pending termination (thread safe) + IsPodPendingTerminationByPodName(podFullname string) bool + // IsPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination (thread safe) + IsPodPendingTerminationByUID(uid types.UID) bool } // podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel @@ -1156,10 +1154,13 @@ type podKillerWithChannel struct { // Channel for getting pods to kill. podKillingCh chan *kubecontainer.PodPair // lock for synchronization between HandlePodCleanups and pod killer - podKillingLock *sync.Mutex + podKillingLock *sync.RWMutex // mirrorPodTerminationMap keeps track of the progress of mirror pod termination // The key is the UID of the pod and the value is the full name of the pod mirrorPodTerminationMap map[string]string + // podTerminationMap keeps track of the progress of pod termination. + // The key is the UID of the pod and the value is the full name of the pod + podTerminationMap map[string]string // killPod is the func which invokes runtime to kill the pod killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error } @@ -1168,47 +1169,89 @@ type podKillerWithChannel struct { func NewPodKiller(kl *Kubelet) PodKiller { podKiller := &podKillerWithChannel{ podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity), - podKillingLock: &sync.Mutex{}, + podKillingLock: &sync.RWMutex{}, mirrorPodTerminationMap: make(map[string]string), + podTerminationMap: make(map[string]string), killPod: kl.killPod, } return podKiller } -// IsMirrorPodPendingTerminationByUID checks whether the pod for the given uid is pending termination -func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByUID(uid types.UID) bool { - pk.podKillingLock.Lock() - defer pk.podKillingLock.Unlock() - _, ok := pk.mirrorPodTerminationMap[string(uid)] - return ok +// IsPodPendingTerminationByUID checks whether the pod for the given uid is pending termination +func (pk *podKillerWithChannel) IsPodPendingTerminationByUID(uid types.UID) bool { + pk.podKillingLock.RLock() + defer pk.podKillingLock.RUnlock() + if _, ok := pk.podTerminationMap[string(uid)]; ok { + return ok + } + if _, ok := pk.mirrorPodTerminationMap[string(uid)]; ok { + return ok + } + return false } // IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination -func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByPodName(podFullname string) bool { - pk.podKillingLock.Lock() - defer pk.podKillingLock.Unlock() +func (pk *podKillerWithChannel) IsPodPendingTerminationByPodName(podFullname string) bool { + pk.podKillingLock.RLock() + defer pk.podKillingLock.RUnlock() for _, name := range pk.mirrorPodTerminationMap { if name == podFullname { return true } } + for _, name := range pk.podTerminationMap { + if name == podFullname { + return true + } + } return false } -func (pk *podKillerWithChannel) markMirrorPodTerminated(uid string) { - pk.podKillingLock.Lock() +func (pk *podKillerWithChannel) markPodTerminated(uid string) { klog.V(4).Infof("marking pod termination %q", uid) + pk.podKillingLock.Lock() + defer pk.podKillingLock.Unlock() delete(pk.mirrorPodTerminationMap, uid) - pk.podKillingLock.Unlock() + delete(pk.podTerminationMap, uid) } -// MarkMirrorPodPendingTermination marks the pod entering grace period of termination -func (pk *podKillerWithChannel) MarkMirrorPodPendingTermination(pod *v1.Pod) { - fullname := kubecontainer.GetPodFullName(pod) - klog.V(3).Infof("marking pod pending termination %q", string(pod.UID)) +// checkAndMarkPodPendingTerminationByPod checks to see if the pod is being +// killed and returns true if it is, otherwise the pod is added to the map and +// returns false +func (pk *podKillerWithChannel) checkAndMarkPodPendingTerminationByPod(podPair *kubecontainer.PodPair) bool { pk.podKillingLock.Lock() - pk.mirrorPodTerminationMap[string(pod.UID)] = fullname - pk.podKillingLock.Unlock() + defer pk.podKillingLock.Unlock() + var apiPodExists bool + var runningPodExists bool + if podPair.APIPod != nil { + uid := string(podPair.APIPod.UID) + _, apiPodExists = pk.mirrorPodTerminationMap[uid] + if !apiPodExists { + fullname := kubecontainer.GetPodFullName(podPair.APIPod) + klog.V(4).Infof("marking api pod pending termination %q : %q", uid, fullname) + pk.mirrorPodTerminationMap[uid] = fullname + } + } + if podPair.RunningPod != nil { + uid := string(podPair.RunningPod.ID) + _, runningPodExists = pk.podTerminationMap[uid] + if !runningPodExists { + fullname := podPair.RunningPod.Name + klog.V(4).Infof("marking running pod pending termination %q: %q", uid, fullname) + pk.podTerminationMap[uid] = fullname + } + } + if apiPodExists || runningPodExists { + if apiPodExists && runningPodExists { + klog.V(4).Infof("api pod %q and running pod %q is pending termination", podPair.APIPod.UID, podPair.RunningPod.ID) + } else if apiPodExists { + klog.V(4).Infof("api pod %q is pending termination", podPair.APIPod.UID) + } else { + klog.V(4).Infof("running pod %q is pending termination", podPair.RunningPod.ID) + } + return true + } + return false } // Close closes the channel through which requests are delivered @@ -1224,33 +1267,23 @@ func (pk *podKillerWithChannel) KillPod(pair *kubecontainer.PodPair) { // PerformPodKillingWork launches a goroutine to kill a pod received from the channel if // another goroutine isn't already in action. func (pk *podKillerWithChannel) PerformPodKillingWork() { - killing := sets.NewString() - // guard for the killing set - lock := sync.Mutex{} for podPair := range pk.podKillingCh { + if pk.checkAndMarkPodPendingTerminationByPod(podPair) { + // Pod is already being killed + continue + } + runningPod := podPair.RunningPod apiPod := podPair.APIPod - lock.Lock() - exists := killing.Has(string(runningPod.ID)) - if !exists { - killing.Insert(string(runningPod.ID)) - } - lock.Unlock() - - if !exists { - go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { - klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) - err := pk.killPod(apiPod, runningPod, nil, nil) - if err != nil { - klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) - } - lock.Lock() - killing.Delete(string(runningPod.ID)) - lock.Unlock() - pk.markMirrorPodTerminated(string(runningPod.ID)) - }(apiPod, runningPod) - } + go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { + klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) + err := pk.killPod(apiPod, runningPod, nil, nil) + if err != nil { + klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) + } + pk.markPodTerminated(string(runningPod.ID)) + }(apiPod, runningPod) } } @@ -1943,7 +1976,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupP } // if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup - if kl.podKiller.IsMirrorPodPendingTerminationByUID(uid) { + if kl.podKiller.IsPodPendingTerminationByUID(uid) { klog.V(3).Infof("pod %q is pending termination", uid) continue } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index dcb9ce9501c..6098811185b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -106,13 +106,14 @@ func (f *fakeImageGCManager) GetImageList() ([]kubecontainer.Image, error) { } type TestKubelet struct { - kubelet *Kubelet - fakeRuntime *containertest.FakeRuntime - fakeKubeClient *fake.Clientset - fakeMirrorClient *podtest.FakeMirrorClient - fakeClock *clock.FakeClock - mounter mount.Interface - volumePlugin *volumetest.FakeVolumePlugin + kubelet *Kubelet + fakeRuntime *containertest.FakeRuntime + fakeContainerManager *cm.FakeContainerManager + fakeKubeClient *fake.Clientset + fakeMirrorClient *podtest.FakeMirrorClient + fakeClock *clock.FakeClock + mounter mount.Interface + volumePlugin *volumetest.FakeVolumePlugin } func (tk *TestKubelet) Cleanup() { @@ -240,7 +241,8 @@ func newTestKubeletWithImageList( kubelet.livenessManager = proberesults.NewManager() kubelet.startupManager = proberesults.NewManager() - kubelet.containerManager = cm.NewStubContainerManager() + fakeContainerManager := cm.NewFakeContainerManager() + kubelet.containerManager = fakeContainerManager fakeNodeRef := &v1.ObjectReference{ Kind: "Node", Name: testKubeletHostname, @@ -349,7 +351,7 @@ func newTestKubeletWithImageList( kubelet.AddPodSyncLoopHandler(activeDeadlineHandler) kubelet.AddPodSyncHandler(activeDeadlineHandler) - return &TestKubelet{kubelet, fakeRuntime, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug} + return &TestKubelet{kubelet, fakeRuntime, fakeContainerManager, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug} } func newTestPods(count int) []*v1.Pod { @@ -405,6 +407,69 @@ func TestSyncPodsStartPod(t *testing.T) { fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) } +func TestSyncPodsDeletesWhenSourcesAreReadyPerQOS(t *testing.T) { + ready := false // sources will not be ready initially, enabled later + + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + go testKubelet.kubelet.podKiller.PerformPodKillingWork() + defer testKubelet.Cleanup() + defer testKubelet.kubelet.podKiller.Close() + + pod := &kubecontainer.Pod{ + ID: "12345678", + Name: "foo", + Namespace: "new", + Containers: []*kubecontainer.Container{ + {Name: "bar"}, + }, + } + + fakeRuntime := testKubelet.fakeRuntime + fakeContainerManager := testKubelet.fakeContainerManager + fakeContainerManager.PodContainerManager.AddPodFromCgroups(pod) // add pod to mock cgroup + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: pod}, + } + kubelet := testKubelet.kubelet + kubelet.cgroupsPerQOS = true // enable cgroupsPerQOS to turn on the cgroups cleanup + kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready }) + + // HandlePodCleanups gets called every 2 seconds within the Kubelet's + // housekeeping routine. This test registers the pod, removes the unwanted pod, then calls into + // HandlePodCleanups a few more times. We should only see one Destroy() event. podKiller runs + // within a goroutine so a two second delay should be enough time to + // mark the pod as killed (within this test case). + + kubelet.HandlePodCleanups() + time.Sleep(2 * time.Second) + fakeRuntime.AssertKilledPods([]string{}) // Sources are not ready yet. Don't remove any pods. + + ready = true // mark sources as ready + kubelet.HandlePodCleanups() + time.Sleep(2 * time.Second) + + // assert that unwanted pods were killed + fakeRuntime.AssertKilledPods([]string{"12345678"}) + + kubelet.HandlePodCleanups() + kubelet.HandlePodCleanups() + kubelet.HandlePodCleanups() + time.Sleep(2 * time.Second) + + fakeContainerManager.PodContainerManager.Lock() + defer fakeContainerManager.PodContainerManager.Unlock() + calledFunctionCount := len(fakeContainerManager.PodContainerManager.CalledFunctions) + destroyCount := 0 + for _, functionName := range fakeContainerManager.PodContainerManager.CalledFunctions { + if functionName == "Destroy" { + destroyCount = destroyCount + 1 + } + } + + assert.Equal(t, 1, destroyCount, "Expect only 1 destroy") + assert.True(t, calledFunctionCount > 2, "expect more than two PodContainerManager calls") +} + func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false