diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 30f4dd65b15..8c88f85a456 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -134,11 +134,6 @@ const ( // MaxContainerBackOff is the max backoff period, exported for the e2e test MaxContainerBackOff = 300 * time.Second - // Capacity of the channel for storing pods to kill. A small number should - // suffice because a goroutine is dedicated to check the channel and does - // not block on anything else. - podKillingChannelCapacity = 50 - // Period for performing global cleanup tasks. housekeepingPeriod = time.Second * 2 @@ -748,7 +743,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) - klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) + klet.podKiller = NewPodKiller(klet) // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock) @@ -1036,8 +1031,8 @@ type Kubelet struct { // Container restart Backoff backOff *flowcontrol.Backoff - // Channel for sending pods to kill. - podKillingCh chan *kubecontainer.PodPair + // Pod killer handles pods to be killed + podKiller PodKiller // Information about the ports which are opened by daemons on Node running this Kubelet server. daemonEndpoints *v1.NodeDaemonEndpoints @@ -1346,7 +1341,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). - go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) + go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop) // Start component sync loops. kl.statusManager.Start() @@ -1671,7 +1666,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { } podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod} - kl.podKillingCh <- &podPair + kl.podKiller.KillPod(&podPair) // TODO: delete the mirror pod here? // We leave the volume/directory cleanup to the periodic cleanup routine. @@ -2006,6 +2001,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { kl.handleMirrorPod(pod, start) continue } + if _, ok := kl.podManager.GetMirrorPodByPod(pod); ok { + kl.podKiller.MarkMirrorPodPendingTermination(pod) + } // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. if err := kl.deletePod(pod); err != nil { diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index fa00196a493..f59872a4586 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -71,6 +71,11 @@ import ( const ( managedHostsHeader = "# Kubernetes-managed hosts file.\n" managedHostsHeaderWithHostNetwork = "# Kubernetes-managed hosts file (host network).\n" + + // Capacity of the channel for storing pods to kill. A small number should + // suffice because a goroutine is dedicated to check the channel and does + // not block on anything else. + podKillingChannelCapacity = 50 ) // Get a list of pods that have data directories. @@ -1020,6 +1025,23 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po kl.statusManager.RemoveOrphanedStatuses(podUIDs) } +// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod. +// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod +// from the API server +func (kl *Kubelet) deleteOrphanedMirrorPods() { + podFullNames := kl.podManager.GetOrphanedMirrorPodNames() + for _, podFullname := range podFullNames { + if !kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullname) { + _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) + if err != nil { + klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err) + } else { + klog.V(3).Infof("deleted pod %q", podFullname) + } + } + } +} + // HandlePodCleanups performs a series of cleanup work, including terminating // pod workers, killing unwanted pods, and removing orphaned volumes/pod // directories. @@ -1071,7 +1093,7 @@ func (kl *Kubelet) HandlePodCleanups() error { } for _, pod := range runningPods { if _, found := desiredPods[pod.ID]; !found { - kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod} + kl.podKiller.KillPod(&kubecontainer.PodPair{APIPod: nil, RunningPod: pod}) } } @@ -1099,24 +1121,112 @@ func (kl *Kubelet) HandlePodCleanups() error { } // Remove any orphaned mirror pods. - kl.podManager.DeleteOrphanedMirrorPods() + kl.deleteOrphanedMirrorPods() // Remove any cgroups in the hierarchy for pods that are no longer running. if kl.cgroupsPerQOS { - kl.cleanupOrphanedPodCgroups(cgroupPods, activePods) + pcm := kl.containerManager.NewPodContainerManager() + kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, activePods) } kl.backOff.GC() return nil } -// podKiller launches a goroutine to kill a pod received from the channel if +// PodKiller handles requests for killing pods +type PodKiller interface { + // KillPod receives pod speficier representing the pod to kill + KillPod(pair *kubecontainer.PodPair) + // PerformPodKillingWork performs the actual pod killing work via calling CRI + // It returns after its Close() func is called and all outstanding pod killing requests are served + PerformPodKillingWork() + // After Close() is called, this pod killer wouldn't accept any more pod killing requests + Close() + // IsMirrorPodPendingTerminationByPodName checks whether the mirror pod for the given full pod name is pending termination + IsMirrorPodPendingTerminationByPodName(podFullname string) bool + // IsMirrorPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination + IsMirrorPodPendingTerminationByUID(uid types.UID) bool + // MarkMirrorPodPendingTermination marks the mirror pod entering grace period of termination + MarkMirrorPodPendingTermination(pod *v1.Pod) +} + +// podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel +type podKillerWithChannel struct { + // Channel for getting pods to kill. + podKillingCh chan *kubecontainer.PodPair + // lock for synchronization between HandlePodCleanups and pod killer + podKillingLock *sync.Mutex + // mirrorPodTerminationMap keeps track of the progress of mirror pod termination + // The key is the UID of the pod and the value is the full name of the pod + mirrorPodTerminationMap map[string]string + // killPod is the func which invokes runtime to kill the pod + killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error +} + +// NewPodKiller returns a functional PodKiller +func NewPodKiller(kl *Kubelet) PodKiller { + podKiller := &podKillerWithChannel{ + podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity), + podKillingLock: &sync.Mutex{}, + mirrorPodTerminationMap: make(map[string]string), + killPod: kl.killPod, + } + return podKiller +} + +// IsMirrorPodPendingTerminationByUID checks whether the pod for the given uid is pending termination +func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByUID(uid types.UID) bool { + pk.podKillingLock.Lock() + defer pk.podKillingLock.Unlock() + _, ok := pk.mirrorPodTerminationMap[string(uid)] + return ok +} + +// IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination +func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByPodName(podFullname string) bool { + pk.podKillingLock.Lock() + defer pk.podKillingLock.Unlock() + for _, name := range pk.mirrorPodTerminationMap { + if name == podFullname { + return true + } + } + return false +} + +func (pk *podKillerWithChannel) markMirrorPodTerminated(uid string) { + pk.podKillingLock.Lock() + klog.V(4).Infof("marking pod termination %q", uid) + delete(pk.mirrorPodTerminationMap, uid) + pk.podKillingLock.Unlock() +} + +// MarkMirrorPodPendingTermination marks the pod entering grace period of termination +func (pk *podKillerWithChannel) MarkMirrorPodPendingTermination(pod *v1.Pod) { + fullname := kubecontainer.GetPodFullName(pod) + klog.V(3).Infof("marking pod pending termination %q", string(pod.UID)) + pk.podKillingLock.Lock() + pk.mirrorPodTerminationMap[string(pod.UID)] = fullname + pk.podKillingLock.Unlock() +} + +// Close closes the channel through which requests are delivered +func (pk *podKillerWithChannel) Close() { + close(pk.podKillingCh) +} + +// KillPod sends pod killing request to the killer +func (pk *podKillerWithChannel) KillPod(pair *kubecontainer.PodPair) { + pk.podKillingCh <- pair +} + +// PerformPodKillingWork launches a goroutine to kill a pod received from the channel if // another goroutine isn't already in action. -func (kl *Kubelet) podKiller() { +func (pk *podKillerWithChannel) PerformPodKillingWork() { killing := sets.NewString() // guard for the killing set lock := sync.Mutex{} - for podPair := range kl.podKillingCh { + for podPair := range pk.podKillingCh { runningPod := podPair.RunningPod apiPod := podPair.APIPod @@ -1130,13 +1240,14 @@ func (kl *Kubelet) podKiller() { if !exists { go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) - err := kl.killPod(apiPod, runningPod, nil, nil) + err := pk.killPod(apiPod, runningPod, nil, nil) if err != nil { klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) } lock.Lock() killing.Delete(string(runningPod.ID)) lock.Unlock() + pk.markMirrorPodTerminated(string(runningPod.ID)) }(apiPod, runningPod) } } @@ -1721,13 +1832,12 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID // cleanupOrphanedPodCgroups removes cgroups that should no longer exist. // it reconciles the cached state of cgroupPods with the specified list of runningPods -func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) { +func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) { // Add all running pods to the set that we want to preserve podSet := sets.NewString() for _, pod := range activePods { podSet.Insert(string(pod.UID)) } - pcm := kl.containerManager.NewPodContainerManager() // Iterate over all the found pods to verify if they should be running for uid, val := range cgroupPods { @@ -1736,6 +1846,11 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupN continue } + // if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup + if kl.podKiller.IsMirrorPodPendingTerminationByUID(uid) { + klog.V(3).Infof("pod %q is pending termination", uid) + continue + } // If volumes have not been unmounted/detached, do not delete the cgroup // so any memory backed volumes don't have their charges propagated to the // parent croup. If the volumes still exist, reduce the cpu shares for any diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index a744418f77c..a00e3e99477 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -279,7 +279,7 @@ func newTestKubeletWithImageList( fakeClock := clock.NewFakeClock(time.Now()) kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock - kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20) + kubelet.podKiller = NewPodKiller(kubelet) kubelet.resyncInterval = 10 * time.Second kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock) // Relist period does not affect the tests. diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 751aa5315f7..215a7a155d6 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -73,10 +73,8 @@ type Manager interface { // this means deleting the mappings related to mirror pods. For non- // mirror pods, this means deleting from indexes for all non-mirror pods. DeletePod(pod *v1.Pod) - // DeleteOrphanedMirrorPods deletes all mirror pods which do not have - // associated static pods. This method sends deletion requests to the API - // server, but does NOT modify the internal pod storage in basicManager. - DeleteOrphanedMirrorPods() + // GetOrphanedMirrorPodNames returns names of orphaned mirror pods + GetOrphanedMirrorPodNames() []string // TranslatePodUID returns the actual UID of a pod. If the UID belongs to // a mirror pod, returns the UID of its static pod. Otherwise, returns the // original UID. @@ -307,7 +305,7 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved return podToMirror, mirrorToPod } -func (pm *basicManager) getOrphanedMirrorPodNames() []string { +func (pm *basicManager) GetOrphanedMirrorPodNames() []string { pm.lock.RLock() defer pm.lock.RUnlock() var podFullNames []string @@ -319,13 +317,6 @@ func (pm *basicManager) getOrphanedMirrorPodNames() []string { return podFullNames } -func (pm *basicManager) DeleteOrphanedMirrorPods() { - podFullNames := pm.getOrphanedMirrorPodNames() - for _, podFullName := range podFullNames { - pm.MirrorClient.DeleteMirrorPod(podFullName, nil) - } -} - func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { // Check name and namespace first. if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 557a5927f3e..d5cff20b4f4 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -158,7 +158,7 @@ func TestDeletePods(t *testing.T) { t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) } - orphanedMirrorPodNames := podManager.getOrphanedMirrorPodNames() + orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames() expectedOrphanedMirrorPodNameNum := 1 if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum { t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames)) diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 32051659410..5bfffd864a4 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -17,7 +17,9 @@ limitations under the License. // Code generated by mockery v1.0.0 package testing -import kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" +import ( + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" +) import mock "github.com/stretchr/testify/mock" import types "k8s.io/apimachinery/pkg/types" @@ -61,9 +63,18 @@ func (_m *MockManager) DeleteMirrorPod(podFullName string, _ *types.UID) (bool, return false, r0 } -// DeleteOrphanedMirrorPods provides a mock function with given fields: -func (_m *MockManager) DeleteOrphanedMirrorPods() { - _m.Called() +func (_m *MockManager) GetOrphanedMirrorPodNames() []string { + ret := _m.Called() + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + return r0 } // DeletePod provides a mock function with given fields: _a0 diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index 39d49cea71f..e2aa5b90bd9 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -127,6 +127,7 @@ go_test( "hugepages_test.go", "image_id_test.go", "log_path_test.go", + "mirror_pod_grace_period_test.go", "mirror_pod_test.go", "node_container_manager_test.go", "node_perf_test.go", diff --git a/test/e2e_node/mirror_pod_grace_period_test.go b/test/e2e_node/mirror_pod_grace_period_test.go new file mode 100644 index 00000000000..6dbd56d8a38 --- /dev/null +++ b/test/e2e_node/mirror_pod_grace_period_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/test/e2e/framework" +) + +var _ = framework.KubeDescribe("MirrorPodWithGracePeriod", func() { + f := framework.NewDefaultFramework("mirror-pod-with-grace-period") + ginkgo.Context("when create a mirror pod ", func() { + var ns, podPath, staticPodName, mirrorPodName string + ginkgo.BeforeEach(func() { + ns = f.Namespace.Name + staticPodName = "graceful-pod-" + string(uuid.NewUUID()) + mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName + + podPath = framework.TestContext.KubeletConfig.StaticPodPath + + ginkgo.By("create the static pod") + err := createStaticPodWithGracePeriod(podPath, staticPodName, ns) + framework.ExpectNoError(err) + + ginkgo.By("wait for the mirror pod to be running") + gomega.Eventually(func() error { + return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns) + }, 2*time.Minute, time.Second*4).Should(gomega.BeNil()) + }) + + ginkgo.It("mirror pod termination should satisfy grace period when static pod is deleted [NodeConformance]", func() { + ginkgo.By("get mirror pod uid") + _, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + start := time.Now() + + ginkgo.By("delete the static pod") + file := staticPodPath(podPath, staticPodName, ns) + framework.Logf("deleting static pod manifest %q", file) + err = os.Remove(file) + framework.ExpectNoError(err) + + for { + if time.Now().Sub(start).Seconds() > 19 { + break + } + pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + if pod.Status.Phase != v1.PodRunning { + framework.Failf("expected the mirror pod %q to be running, got %q", mirrorPodName, pod.Status.Phase) + } + // have some pause in between the API server queries to avoid throttling + time.Sleep(time.Duration(200) * time.Millisecond) + } + }) + + ginkgo.AfterEach(func() { + ginkgo.By("wait for the mirror pod to disappear") + gomega.Eventually(func() error { + return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns) + }, time.Second*19, time.Second).Should(gomega.BeNil()) + }) + }) +}) + +func createStaticPodWithGracePeriod(dir, name, namespace string) error { + template := ` +apiVersion: v1 +kind: Pod +metadata: + name: %s + namespace: %s +spec: + terminationGracePeriodSeconds: 20 + containers: + - name: m-test + image: busybox:1.31.1 + command: + - /bin/sh + args: + - '-c' + - | + _term() { + echo "Caught SIGTERM signal!" + sleep 100 + } + trap _term SIGTERM + sleep 1000 +` + file := staticPodPath(dir, name, namespace) + podYaml := fmt.Sprintf(template, name, namespace) + + f, err := os.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0666) + if err != nil { + return err + } + defer f.Close() + + _, err = f.WriteString(podYaml) + framework.Logf("has written %v", file) + return err +}