register all pending pod deletions and check for kill

do not delete the cgroup from a pod when it is being killed
This commit is contained in:
Ryan Phillips 2021-01-26 16:22:08 -06:00
parent 35bc0fbad5
commit f918e11e3a
6 changed files with 473 additions and 67 deletions

View File

@ -10,7 +10,9 @@ go_library(
"container_manager_stub.go", "container_manager_stub.go",
"container_manager_unsupported.go", "container_manager_unsupported.go",
"container_manager_windows.go", "container_manager_windows.go",
"fake_container_manager.go",
"fake_internal_container_lifecycle.go", "fake_internal_container_lifecycle.go",
"fake_pod_container_manager.go",
"helpers.go", "helpers.go",
"helpers_linux.go", "helpers_linux.go",
"helpers_unsupported.go", "helpers_unsupported.go",

View File

@ -0,0 +1,202 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
type FakeContainerManager struct {
sync.Mutex
CalledFunctions []string
PodContainerManager *FakePodContainerManager
shouldResetExtendedResourceCapacity bool
}
var _ ContainerManager = &FakeContainerManager{}
func NewFakeContainerManager() *FakeContainerManager {
return &FakeContainerManager{
PodContainerManager: NewFakePodContainerManager(),
}
}
func (cm *FakeContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "Start")
return nil
}
func (cm *FakeContainerManager) SystemCgroupsLimit() v1.ResourceList {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "SystemCgroupsLimit")
return v1.ResourceList{}
}
func (cm *FakeContainerManager) GetNodeConfig() NodeConfig {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetNodeConfig")
return NodeConfig{}
}
func (cm *FakeContainerManager) GetMountedSubsystems() *CgroupSubsystems {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetMountedSubsystems")
return &CgroupSubsystems{}
}
func (cm *FakeContainerManager) GetQOSContainersInfo() QOSContainersInfo {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "QOSContainersInfo")
return QOSContainersInfo{}
}
func (cm *FakeContainerManager) UpdateQOSCgroups() error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "UpdateQOSCgroups")
return nil
}
func (cm *FakeContainerManager) Status() Status {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "Status")
return Status{}
}
func (cm *FakeContainerManager) GetNodeAllocatableReservation() v1.ResourceList {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetNodeAllocatableReservation")
return nil
}
func (cm *FakeContainerManager) GetCapacity() v1.ResourceList {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetCapacity")
c := v1.ResourceList{
v1.ResourceEphemeralStorage: *resource.NewQuantity(
int64(0),
resource.BinarySI),
}
return c
}
func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler")
return nil
}
func (cm *FakeContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetDevicePluginResourceCapacity")
return nil, nil, []string{}
}
func (cm *FakeContainerManager) NewPodContainerManager() PodContainerManager {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "PodContainerManager")
return cm.PodContainerManager
}
func (cm *FakeContainerManager) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetResources")
return &kubecontainer.RunContainerOptions{}, nil
}
func (cm *FakeContainerManager) UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "UpdatePluginResources")
return nil
}
func (cm *FakeContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "InternalContainerLifecycle")
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
}
func (cm *FakeContainerManager) GetPodCgroupRoot() string {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPodCgroupRoot")
return ""
}
func (cm *FakeContainerManager) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetDevices")
return nil
}
func (cm *FakeContainerManager) ShouldResetExtendedResourceCapacity() bool {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "ShouldResetExtendedResourceCapacity")
return cm.shouldResetExtendedResourceCapacity
}
func (cm *FakeContainerManager) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocateResourcesPodAdmitHandler")
return topologymanager.NewFakeManager()
}
func (cm *FakeContainerManager) UpdateAllocatedDevices() {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "UpdateAllocatedDevices")
return
}
func (cm *FakeContainerManager) GetCPUs(_, _ string) []int64 {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs")
return nil
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"reflect"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type FakePodContainerManager struct {
sync.Mutex
CalledFunctions []string
Cgroups map[types.UID]CgroupName
}
var _ PodContainerManager = &FakePodContainerManager{}
func NewFakePodContainerManager() *FakePodContainerManager {
return &FakePodContainerManager{
Cgroups: make(map[types.UID]CgroupName),
}
}
func (m *FakePodContainerManager) AddPodFromCgroups(pod *kubecontainer.Pod) {
m.Lock()
defer m.Unlock()
m.Cgroups[pod.ID] = []string{pod.Name}
}
func (m *FakePodContainerManager) Exists(_ *v1.Pod) bool {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "Exists")
return true
}
func (m *FakePodContainerManager) EnsureExists(_ *v1.Pod) error {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "EnsureExists")
return nil
}
func (m *FakePodContainerManager) GetPodContainerName(_ *v1.Pod) (CgroupName, string) {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "GetPodContainerName")
return nil, ""
}
func (m *FakePodContainerManager) Destroy(name CgroupName) error {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "Destroy")
for key, cgname := range m.Cgroups {
if reflect.DeepEqual(cgname, name) {
delete(m.Cgroups, key)
return nil
}
}
return nil
}
func (m *FakePodContainerManager) ReduceCPULimits(_ CgroupName) error {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "ReduceCPULimits")
return nil
}
func (m *FakePodContainerManager) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "GetAllPodsFromCgroups")
// return a copy for the race detector
grp := make(map[types.UID]CgroupName)
for key, value := range m.Cgroups {
grp[key] = value
}
return grp, nil
}
func (m *FakePodContainerManager) IsPodCgroup(cgroupfs string) (bool, types.UID) {
m.Lock()
defer m.Unlock()
m.CalledFunctions = append(m.CalledFunctions, "IsPodCgroup")
return false, types.UID("")
}

View File

@ -1504,10 +1504,12 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
return nil return nil
} }
// If the pod is a static pod and its mirror pod is still gracefully terminating, // If a pod is still gracefully terminating, then we do not want to
// we do not want to start the new static pod until the old static pod is gracefully terminated. // take further action. This mitigates static pods and deleted pods
// from getting rerun prematurely or their cgroups being deleted before
// the runtime cleans up.
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
if kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullName) { if kl.podKiller.IsPodPendingTerminationByPodName(podFullName) {
return fmt.Errorf("pod %q is pending termination", podFullName) return fmt.Errorf("pod %q is pending termination", podFullName)
} }
@ -1775,10 +1777,6 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
return fmt.Errorf("pod not found") return fmt.Errorf("pod not found")
} }
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod} podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
if _, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
kl.podKiller.MarkMirrorPodPendingTermination(pod)
}
kl.podKiller.KillPod(&podPair) kl.podKiller.KillPod(&podPair)
// We leave the volume/directory cleanup to the periodic cleanup routine. // We leave the volume/directory cleanup to the periodic cleanup routine.

View File

@ -1033,7 +1033,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po
func (kl *Kubelet) deleteOrphanedMirrorPods() { func (kl *Kubelet) deleteOrphanedMirrorPods() {
podFullNames := kl.podManager.GetOrphanedMirrorPodNames() podFullNames := kl.podManager.GetOrphanedMirrorPodNames()
for _, podFullname := range podFullNames { for _, podFullname := range podFullNames {
if !kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullname) { if !kl.podKiller.IsPodPendingTerminationByPodName(podFullname) {
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil) _, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
if err != nil { if err != nil {
klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err) klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err)
@ -1143,12 +1143,10 @@ type PodKiller interface {
PerformPodKillingWork() PerformPodKillingWork()
// After Close() is called, this pod killer wouldn't accept any more pod killing requests // After Close() is called, this pod killer wouldn't accept any more pod killing requests
Close() Close()
// IsMirrorPodPendingTerminationByPodName checks whether the mirror pod for the given full pod name is pending termination // IsPodPendingTerminationByPodName checks whether any pod for the given full pod name is pending termination (thread safe)
IsMirrorPodPendingTerminationByPodName(podFullname string) bool IsPodPendingTerminationByPodName(podFullname string) bool
// IsMirrorPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination // IsPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination (thread safe)
IsMirrorPodPendingTerminationByUID(uid types.UID) bool IsPodPendingTerminationByUID(uid types.UID) bool
// MarkMirrorPodPendingTermination marks the mirror pod entering grace period of termination
MarkMirrorPodPendingTermination(pod *v1.Pod)
} }
// podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel // podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel
@ -1156,10 +1154,13 @@ type podKillerWithChannel struct {
// Channel for getting pods to kill. // Channel for getting pods to kill.
podKillingCh chan *kubecontainer.PodPair podKillingCh chan *kubecontainer.PodPair
// lock for synchronization between HandlePodCleanups and pod killer // lock for synchronization between HandlePodCleanups and pod killer
podKillingLock *sync.Mutex podKillingLock *sync.RWMutex
// mirrorPodTerminationMap keeps track of the progress of mirror pod termination // mirrorPodTerminationMap keeps track of the progress of mirror pod termination
// The key is the UID of the pod and the value is the full name of the pod // The key is the UID of the pod and the value is the full name of the pod
mirrorPodTerminationMap map[string]string mirrorPodTerminationMap map[string]string
// podTerminationMap keeps track of the progress of pod termination.
// The key is the UID of the pod and the value is the full name of the pod
podTerminationMap map[string]string
// killPod is the func which invokes runtime to kill the pod // killPod is the func which invokes runtime to kill the pod
killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error
} }
@ -1168,47 +1169,89 @@ type podKillerWithChannel struct {
func NewPodKiller(kl *Kubelet) PodKiller { func NewPodKiller(kl *Kubelet) PodKiller {
podKiller := &podKillerWithChannel{ podKiller := &podKillerWithChannel{
podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity), podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity),
podKillingLock: &sync.Mutex{}, podKillingLock: &sync.RWMutex{},
mirrorPodTerminationMap: make(map[string]string), mirrorPodTerminationMap: make(map[string]string),
podTerminationMap: make(map[string]string),
killPod: kl.killPod, killPod: kl.killPod,
} }
return podKiller return podKiller
} }
// IsMirrorPodPendingTerminationByUID checks whether the pod for the given uid is pending termination // IsPodPendingTerminationByUID checks whether the pod for the given uid is pending termination
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByUID(uid types.UID) bool { func (pk *podKillerWithChannel) IsPodPendingTerminationByUID(uid types.UID) bool {
pk.podKillingLock.Lock() pk.podKillingLock.RLock()
defer pk.podKillingLock.Unlock() defer pk.podKillingLock.RUnlock()
_, ok := pk.mirrorPodTerminationMap[string(uid)] if _, ok := pk.podTerminationMap[string(uid)]; ok {
return ok return ok
} }
if _, ok := pk.mirrorPodTerminationMap[string(uid)]; ok {
return ok
}
return false
}
// IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination // IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByPodName(podFullname string) bool { func (pk *podKillerWithChannel) IsPodPendingTerminationByPodName(podFullname string) bool {
pk.podKillingLock.Lock() pk.podKillingLock.RLock()
defer pk.podKillingLock.Unlock() defer pk.podKillingLock.RUnlock()
for _, name := range pk.mirrorPodTerminationMap { for _, name := range pk.mirrorPodTerminationMap {
if name == podFullname { if name == podFullname {
return true return true
} }
} }
for _, name := range pk.podTerminationMap {
if name == podFullname {
return true
}
}
return false return false
} }
func (pk *podKillerWithChannel) markMirrorPodTerminated(uid string) { func (pk *podKillerWithChannel) markPodTerminated(uid string) {
pk.podKillingLock.Lock()
klog.V(4).Infof("marking pod termination %q", uid) klog.V(4).Infof("marking pod termination %q", uid)
pk.podKillingLock.Lock()
defer pk.podKillingLock.Unlock()
delete(pk.mirrorPodTerminationMap, uid) delete(pk.mirrorPodTerminationMap, uid)
pk.podKillingLock.Unlock() delete(pk.podTerminationMap, uid)
} }
// MarkMirrorPodPendingTermination marks the pod entering grace period of termination // checkAndMarkPodPendingTerminationByPod checks to see if the pod is being
func (pk *podKillerWithChannel) MarkMirrorPodPendingTermination(pod *v1.Pod) { // killed and returns true if it is, otherwise the pod is added to the map and
fullname := kubecontainer.GetPodFullName(pod) // returns false
klog.V(3).Infof("marking pod pending termination %q", string(pod.UID)) func (pk *podKillerWithChannel) checkAndMarkPodPendingTerminationByPod(podPair *kubecontainer.PodPair) bool {
pk.podKillingLock.Lock() pk.podKillingLock.Lock()
pk.mirrorPodTerminationMap[string(pod.UID)] = fullname defer pk.podKillingLock.Unlock()
pk.podKillingLock.Unlock() var apiPodExists bool
var runningPodExists bool
if podPair.APIPod != nil {
uid := string(podPair.APIPod.UID)
_, apiPodExists = pk.mirrorPodTerminationMap[uid]
if !apiPodExists {
fullname := kubecontainer.GetPodFullName(podPair.APIPod)
klog.V(4).Infof("marking api pod pending termination %q : %q", uid, fullname)
pk.mirrorPodTerminationMap[uid] = fullname
}
}
if podPair.RunningPod != nil {
uid := string(podPair.RunningPod.ID)
_, runningPodExists = pk.podTerminationMap[uid]
if !runningPodExists {
fullname := podPair.RunningPod.Name
klog.V(4).Infof("marking running pod pending termination %q: %q", uid, fullname)
pk.podTerminationMap[uid] = fullname
}
}
if apiPodExists || runningPodExists {
if apiPodExists && runningPodExists {
klog.V(4).Infof("api pod %q and running pod %q is pending termination", podPair.APIPod.UID, podPair.RunningPod.ID)
} else if apiPodExists {
klog.V(4).Infof("api pod %q is pending termination", podPair.APIPod.UID)
} else {
klog.V(4).Infof("running pod %q is pending termination", podPair.RunningPod.ID)
}
return true
}
return false
} }
// Close closes the channel through which requests are delivered // Close closes the channel through which requests are delivered
@ -1224,35 +1267,25 @@ func (pk *podKillerWithChannel) KillPod(pair *kubecontainer.PodPair) {
// PerformPodKillingWork launches a goroutine to kill a pod received from the channel if // PerformPodKillingWork launches a goroutine to kill a pod received from the channel if
// another goroutine isn't already in action. // another goroutine isn't already in action.
func (pk *podKillerWithChannel) PerformPodKillingWork() { func (pk *podKillerWithChannel) PerformPodKillingWork() {
killing := sets.NewString()
// guard for the killing set
lock := sync.Mutex{}
for podPair := range pk.podKillingCh { for podPair := range pk.podKillingCh {
if pk.checkAndMarkPodPendingTerminationByPod(podPair) {
// Pod is already being killed
continue
}
runningPod := podPair.RunningPod runningPod := podPair.RunningPod
apiPod := podPair.APIPod apiPod := podPair.APIPod
lock.Lock()
exists := killing.Has(string(runningPod.ID))
if !exists {
killing.Insert(string(runningPod.ID))
}
lock.Unlock()
if !exists {
go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
err := pk.killPod(apiPod, runningPod, nil, nil) err := pk.killPod(apiPod, runningPod, nil, nil)
if err != nil { if err != nil {
klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
} }
lock.Lock() pk.markPodTerminated(string(runningPod.ID))
killing.Delete(string(runningPod.ID))
lock.Unlock()
pk.markMirrorPodTerminated(string(runningPod.ID))
}(apiPod, runningPod) }(apiPod, runningPod)
} }
} }
}
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state // validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
// of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current // of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
@ -1943,7 +1976,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupP
} }
// if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup // if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup
if kl.podKiller.IsMirrorPodPendingTerminationByUID(uid) { if kl.podKiller.IsPodPendingTerminationByUID(uid) {
klog.V(3).Infof("pod %q is pending termination", uid) klog.V(3).Infof("pod %q is pending termination", uid)
continue continue
} }

View File

@ -108,6 +108,7 @@ func (f *fakeImageGCManager) GetImageList() ([]kubecontainer.Image, error) {
type TestKubelet struct { type TestKubelet struct {
kubelet *Kubelet kubelet *Kubelet
fakeRuntime *containertest.FakeRuntime fakeRuntime *containertest.FakeRuntime
fakeContainerManager *cm.FakeContainerManager
fakeKubeClient *fake.Clientset fakeKubeClient *fake.Clientset
fakeMirrorClient *podtest.FakeMirrorClient fakeMirrorClient *podtest.FakeMirrorClient
fakeClock *clock.FakeClock fakeClock *clock.FakeClock
@ -240,7 +241,8 @@ func newTestKubeletWithImageList(
kubelet.livenessManager = proberesults.NewManager() kubelet.livenessManager = proberesults.NewManager()
kubelet.startupManager = proberesults.NewManager() kubelet.startupManager = proberesults.NewManager()
kubelet.containerManager = cm.NewStubContainerManager() fakeContainerManager := cm.NewFakeContainerManager()
kubelet.containerManager = fakeContainerManager
fakeNodeRef := &v1.ObjectReference{ fakeNodeRef := &v1.ObjectReference{
Kind: "Node", Kind: "Node",
Name: testKubeletHostname, Name: testKubeletHostname,
@ -349,7 +351,7 @@ func newTestKubeletWithImageList(
kubelet.AddPodSyncLoopHandler(activeDeadlineHandler) kubelet.AddPodSyncLoopHandler(activeDeadlineHandler)
kubelet.AddPodSyncHandler(activeDeadlineHandler) kubelet.AddPodSyncHandler(activeDeadlineHandler)
return &TestKubelet{kubelet, fakeRuntime, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug} return &TestKubelet{kubelet, fakeRuntime, fakeContainerManager, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug}
} }
func newTestPods(count int) []*v1.Pod { func newTestPods(count int) []*v1.Pod {
@ -405,6 +407,69 @@ func TestSyncPodsStartPod(t *testing.T) {
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
} }
func TestSyncPodsDeletesWhenSourcesAreReadyPerQOS(t *testing.T) {
ready := false // sources will not be ready initially, enabled later
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
go testKubelet.kubelet.podKiller.PerformPodKillingWork()
defer testKubelet.Cleanup()
defer testKubelet.kubelet.podKiller.Close()
pod := &kubecontainer.Pod{
ID: "12345678",
Name: "foo",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "bar"},
},
}
fakeRuntime := testKubelet.fakeRuntime
fakeContainerManager := testKubelet.fakeContainerManager
fakeContainerManager.PodContainerManager.AddPodFromCgroups(pod) // add pod to mock cgroup
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: pod},
}
kubelet := testKubelet.kubelet
kubelet.cgroupsPerQOS = true // enable cgroupsPerQOS to turn on the cgroups cleanup
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready })
// HandlePodCleanups gets called every 2 seconds within the Kubelet's
// housekeeping routine. This test registers the pod, removes the unwanted pod, then calls into
// HandlePodCleanups a few more times. We should only see one Destroy() event. podKiller runs
// within a goroutine so a two second delay should be enough time to
// mark the pod as killed (within this test case).
kubelet.HandlePodCleanups()
time.Sleep(2 * time.Second)
fakeRuntime.AssertKilledPods([]string{}) // Sources are not ready yet. Don't remove any pods.
ready = true // mark sources as ready
kubelet.HandlePodCleanups()
time.Sleep(2 * time.Second)
// assert that unwanted pods were killed
fakeRuntime.AssertKilledPods([]string{"12345678"})
kubelet.HandlePodCleanups()
kubelet.HandlePodCleanups()
kubelet.HandlePodCleanups()
time.Sleep(2 * time.Second)
fakeContainerManager.PodContainerManager.Lock()
defer fakeContainerManager.PodContainerManager.Unlock()
calledFunctionCount := len(fakeContainerManager.PodContainerManager.CalledFunctions)
destroyCount := 0
for _, functionName := range fakeContainerManager.PodContainerManager.CalledFunctions {
if functionName == "Destroy" {
destroyCount = destroyCount + 1
}
}
assert.Equal(t, 1, destroyCount, "Expect only 1 destroy")
assert.True(t, calledFunctionCount > 2, "expect more than two PodContainerManager calls")
}
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false ready := false