Merge pull request #92442 from tedyu/grace-period-with-map

Respect grace period when removing mirror pod
This commit is contained in:
Kubernetes Prow Robot 2020-07-10 17:49:23 -07:00 committed by GitHub
commit 93e76f5081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 279 additions and 37 deletions

View File

@ -134,11 +134,6 @@ const (
// MaxContainerBackOff is the max backoff period, exported for the e2e test
MaxContainerBackOff = 300 * time.Second
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
// Period for performing global cleanup tasks.
housekeepingPeriod = time.Second * 2
@ -748,7 +743,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.podKiller = NewPodKiller(klet)
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
@ -1036,8 +1031,8 @@ type Kubelet struct {
// Container restart Backoff
backOff *flowcontrol.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair
// Pod killer handles pods to be killed
podKiller PodKiller
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *v1.NodeDaemonEndpoints
@ -1346,7 +1341,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
@ -1671,7 +1666,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
}
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
kl.podKillingCh <- &podPair
kl.podKiller.KillPod(&podPair)
// TODO: delete the mirror pod here?
// We leave the volume/directory cleanup to the periodic cleanup routine.
@ -2006,6 +2001,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
kl.handleMirrorPod(pod, start)
continue
}
if _, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
kl.podKiller.MarkMirrorPodPendingTermination(pod)
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {

View File

@ -71,6 +71,11 @@ import (
const (
managedHostsHeader = "# Kubernetes-managed hosts file.\n"
managedHostsHeaderWithHostNetwork = "# Kubernetes-managed hosts file (host network).\n"
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
)
// Get a list of pods that have data directories.
@ -1020,6 +1025,23 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod.
// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod
// from the API server
func (kl *Kubelet) deleteOrphanedMirrorPods() {
podFullNames := kl.podManager.GetOrphanedMirrorPodNames()
for _, podFullname := range podFullNames {
if !kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullname) {
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err)
} else {
klog.V(3).Infof("deleted pod %q", podFullname)
}
}
}
}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
@ -1071,7 +1093,7 @@ func (kl *Kubelet) HandlePodCleanups() error {
}
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; !found {
kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
kl.podKiller.KillPod(&kubecontainer.PodPair{APIPod: nil, RunningPod: pod})
}
}
@ -1099,24 +1121,112 @@ func (kl *Kubelet) HandlePodCleanups() error {
}
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()
kl.deleteOrphanedMirrorPods()
// Remove any cgroups in the hierarchy for pods that are no longer running.
if kl.cgroupsPerQOS {
kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)
pcm := kl.containerManager.NewPodContainerManager()
kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, activePods)
}
kl.backOff.GC()
return nil
}
// podKiller launches a goroutine to kill a pod received from the channel if
// PodKiller handles requests for killing pods
type PodKiller interface {
// KillPod receives pod speficier representing the pod to kill
KillPod(pair *kubecontainer.PodPair)
// PerformPodKillingWork performs the actual pod killing work via calling CRI
// It returns after its Close() func is called and all outstanding pod killing requests are served
PerformPodKillingWork()
// After Close() is called, this pod killer wouldn't accept any more pod killing requests
Close()
// IsMirrorPodPendingTerminationByPodName checks whether the mirror pod for the given full pod name is pending termination
IsMirrorPodPendingTerminationByPodName(podFullname string) bool
// IsMirrorPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination
IsMirrorPodPendingTerminationByUID(uid types.UID) bool
// MarkMirrorPodPendingTermination marks the mirror pod entering grace period of termination
MarkMirrorPodPendingTermination(pod *v1.Pod)
}
// podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel
type podKillerWithChannel struct {
// Channel for getting pods to kill.
podKillingCh chan *kubecontainer.PodPair
// lock for synchronization between HandlePodCleanups and pod killer
podKillingLock *sync.Mutex
// mirrorPodTerminationMap keeps track of the progress of mirror pod termination
// The key is the UID of the pod and the value is the full name of the pod
mirrorPodTerminationMap map[string]string
// killPod is the func which invokes runtime to kill the pod
killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error
}
// NewPodKiller returns a functional PodKiller
func NewPodKiller(kl *Kubelet) PodKiller {
podKiller := &podKillerWithChannel{
podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity),
podKillingLock: &sync.Mutex{},
mirrorPodTerminationMap: make(map[string]string),
killPod: kl.killPod,
}
return podKiller
}
// IsMirrorPodPendingTerminationByUID checks whether the pod for the given uid is pending termination
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByUID(uid types.UID) bool {
pk.podKillingLock.Lock()
defer pk.podKillingLock.Unlock()
_, ok := pk.mirrorPodTerminationMap[string(uid)]
return ok
}
// IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByPodName(podFullname string) bool {
pk.podKillingLock.Lock()
defer pk.podKillingLock.Unlock()
for _, name := range pk.mirrorPodTerminationMap {
if name == podFullname {
return true
}
}
return false
}
func (pk *podKillerWithChannel) markMirrorPodTerminated(uid string) {
pk.podKillingLock.Lock()
klog.V(4).Infof("marking pod termination %q", uid)
delete(pk.mirrorPodTerminationMap, uid)
pk.podKillingLock.Unlock()
}
// MarkMirrorPodPendingTermination marks the pod entering grace period of termination
func (pk *podKillerWithChannel) MarkMirrorPodPendingTermination(pod *v1.Pod) {
fullname := kubecontainer.GetPodFullName(pod)
klog.V(3).Infof("marking pod pending termination %q", string(pod.UID))
pk.podKillingLock.Lock()
pk.mirrorPodTerminationMap[string(pod.UID)] = fullname
pk.podKillingLock.Unlock()
}
// Close closes the channel through which requests are delivered
func (pk *podKillerWithChannel) Close() {
close(pk.podKillingCh)
}
// KillPod sends pod killing request to the killer
func (pk *podKillerWithChannel) KillPod(pair *kubecontainer.PodPair) {
pk.podKillingCh <- pair
}
// PerformPodKillingWork launches a goroutine to kill a pod received from the channel if
// another goroutine isn't already in action.
func (kl *Kubelet) podKiller() {
func (pk *podKillerWithChannel) PerformPodKillingWork() {
killing := sets.NewString()
// guard for the killing set
lock := sync.Mutex{}
for podPair := range kl.podKillingCh {
for podPair := range pk.podKillingCh {
runningPod := podPair.RunningPod
apiPod := podPair.APIPod
@ -1130,13 +1240,14 @@ func (kl *Kubelet) podKiller() {
if !exists {
go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
err := kl.killPod(apiPod, runningPod, nil, nil)
err := pk.killPod(apiPod, runningPod, nil, nil)
if err != nil {
klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
}
lock.Lock()
killing.Delete(string(runningPod.ID))
lock.Unlock()
pk.markMirrorPodTerminated(string(runningPod.ID))
}(apiPod, runningPod)
}
}
@ -1721,13 +1832,12 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.
// it reconciles the cached state of cgroupPods with the specified list of runningPods
func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) {
func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) {
// Add all running pods to the set that we want to preserve
podSet := sets.NewString()
for _, pod := range activePods {
podSet.Insert(string(pod.UID))
}
pcm := kl.containerManager.NewPodContainerManager()
// Iterate over all the found pods to verify if they should be running
for uid, val := range cgroupPods {
@ -1736,6 +1846,11 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupN
continue
}
// if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup
if kl.podKiller.IsMirrorPodPendingTerminationByUID(uid) {
klog.V(3).Infof("pod %q is pending termination", uid)
continue
}
// If volumes have not been unmounted/detached, do not delete the cgroup
// so any memory backed volumes don't have their charges propagated to the
// parent croup. If the volumes still exist, reduce the cpu shares for any

View File

@ -279,7 +279,7 @@ func newTestKubeletWithImageList(
fakeClock := clock.NewFakeClock(time.Now())
kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20)
kubelet.podKiller = NewPodKiller(kubelet)
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
// Relist period does not affect the tests.

View File

@ -73,10 +73,8 @@ type Manager interface {
// this means deleting the mappings related to mirror pods. For non-
// mirror pods, this means deleting from indexes for all non-mirror pods.
DeletePod(pod *v1.Pod)
// DeleteOrphanedMirrorPods deletes all mirror pods which do not have
// associated static pods. This method sends deletion requests to the API
// server, but does NOT modify the internal pod storage in basicManager.
DeleteOrphanedMirrorPods()
// GetOrphanedMirrorPodNames returns names of orphaned mirror pods
GetOrphanedMirrorPodNames() []string
// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
// a mirror pod, returns the UID of its static pod. Otherwise, returns the
// original UID.
@ -307,7 +305,7 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved
return podToMirror, mirrorToPod
}
func (pm *basicManager) getOrphanedMirrorPodNames() []string {
func (pm *basicManager) GetOrphanedMirrorPodNames() []string {
pm.lock.RLock()
defer pm.lock.RUnlock()
var podFullNames []string
@ -319,13 +317,6 @@ func (pm *basicManager) getOrphanedMirrorPodNames() []string {
return podFullNames
}
func (pm *basicManager) DeleteOrphanedMirrorPods() {
podFullNames := pm.getOrphanedMirrorPodNames()
for _, podFullName := range podFullNames {
pm.MirrorClient.DeleteMirrorPod(podFullName, nil)
}
}
func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
// Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {

View File

@ -158,7 +158,7 @@ func TestDeletePods(t *testing.T) {
t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods))
}
orphanedMirrorPodNames := podManager.getOrphanedMirrorPodNames()
orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames()
expectedOrphanedMirrorPodNameNum := 1
if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum {
t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames))

View File

@ -17,7 +17,9 @@ limitations under the License.
// Code generated by mockery v1.0.0
package testing
import kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
import (
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
)
import mock "github.com/stretchr/testify/mock"
import types "k8s.io/apimachinery/pkg/types"
@ -61,9 +63,18 @@ func (_m *MockManager) DeleteMirrorPod(podFullName string, _ *types.UID) (bool,
return false, r0
}
// DeleteOrphanedMirrorPods provides a mock function with given fields:
func (_m *MockManager) DeleteOrphanedMirrorPods() {
_m.Called()
func (_m *MockManager) GetOrphanedMirrorPodNames() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// DeletePod provides a mock function with given fields: _a0

View File

@ -127,6 +127,7 @@ go_test(
"hugepages_test.go",
"image_id_test.go",
"log_path_test.go",
"mirror_pod_grace_period_test.go",
"mirror_pod_test.go",
"node_container_manager_test.go",
"node_perf_test.go",

View File

@ -0,0 +1,126 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"os"
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/test/e2e/framework"
)
var _ = framework.KubeDescribe("MirrorPodWithGracePeriod", func() {
f := framework.NewDefaultFramework("mirror-pod-with-grace-period")
ginkgo.Context("when create a mirror pod ", func() {
var ns, podPath, staticPodName, mirrorPodName string
ginkgo.BeforeEach(func() {
ns = f.Namespace.Name
staticPodName = "graceful-pod-" + string(uuid.NewUUID())
mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName
podPath = framework.TestContext.KubeletConfig.StaticPodPath
ginkgo.By("create the static pod")
err := createStaticPodWithGracePeriod(podPath, staticPodName, ns)
framework.ExpectNoError(err)
ginkgo.By("wait for the mirror pod to be running")
gomega.Eventually(func() error {
return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
})
ginkgo.It("mirror pod termination should satisfy grace period when static pod is deleted [NodeConformance]", func() {
ginkgo.By("get mirror pod uid")
_, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
start := time.Now()
ginkgo.By("delete the static pod")
file := staticPodPath(podPath, staticPodName, ns)
framework.Logf("deleting static pod manifest %q", file)
err = os.Remove(file)
framework.ExpectNoError(err)
for {
if time.Now().Sub(start).Seconds() > 19 {
break
}
pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
if pod.Status.Phase != v1.PodRunning {
framework.Failf("expected the mirror pod %q to be running, got %q", mirrorPodName, pod.Status.Phase)
}
// have some pause in between the API server queries to avoid throttling
time.Sleep(time.Duration(200) * time.Millisecond)
}
})
ginkgo.AfterEach(func() {
ginkgo.By("wait for the mirror pod to disappear")
gomega.Eventually(func() error {
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
}, time.Second*19, time.Second).Should(gomega.BeNil())
})
})
})
func createStaticPodWithGracePeriod(dir, name, namespace string) error {
template := `
apiVersion: v1
kind: Pod
metadata:
name: %s
namespace: %s
spec:
terminationGracePeriodSeconds: 20
containers:
- name: m-test
image: busybox:1.31.1
command:
- /bin/sh
args:
- '-c'
- |
_term() {
echo "Caught SIGTERM signal!"
sleep 100
}
trap _term SIGTERM
sleep 1000
`
file := staticPodPath(dir, name, namespace)
podYaml := fmt.Sprintf(template, name, namespace)
f, err := os.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(podYaml)
framework.Logf("has written %v", file)
return err
}