Merge pull request #50350 from dashpole/eviction_container_deletion

Automatic merge from submit-queue (batch tested with PRs 51186, 50350, 51751, 51645, 51837)

Wait for container cleanup before deletion

We should wait to delete pod API objects until the pod's containers have been cleaned up. See issue: #50268 for background.

This changes the kubelet container gc, which deletes containers belonging to pods considered "deleted".
It adds two conditions under which a pod is considered "deleted", allowing containers to be deleted:
Pods where deletionTimestamp is set, and containers are not running
Pods that are evicted

This PR also changes the function PodResourcesAreReclaimed by making it return false if containers still exist.
The eviction manager will wait for containers of previous evicted pod to be deleted before evicting another pod.
The status manager will wait for containers to be deleted before removing the pod API object.

/assign @vishh
This commit is contained in:
Kubernetes Submit Queue 2017-09-05 17:30:03 -07:00 committed by GitHub
commit 78c820803c
10 changed files with 83 additions and 75 deletions

View File

@ -629,7 +629,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.SeccompProfileRoot,
containerRefManager,
machineInfo,
klet.podManager,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
@ -667,7 +667,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet,
klet.livenessManager,
httpClient,
klet.networkPlugin,

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -819,6 +820,22 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
// IsPodDeleted returns true if the pod is deleted. For the pod to be deleted, either:
// 1. The pod object is deleted
// 2. The pod's status is evicted
// 3. The pod's deletion timestamp is set, and containers are not running
func (kl *Kubelet) IsPodDeleted(uid types.UID) bool {
pod, podFound := kl.podManager.GetPodByUID(uid)
if !podFound {
return true
}
status, statusFound := kl.statusManager.GetPodStatus(pod.UID)
if !statusFound {
status = pod.Status
}
return eviction.PodIsEvicted(status) || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
@ -827,6 +844,16 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// pod's containers should be deleted
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
glog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %+v", format.Pod(pod), runtimeStatus.ContainerStatuses)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))

View File

@ -21,7 +21,6 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
@ -44,17 +43,19 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
return nil, f.err
}
type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}
func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}
func (f *fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pod, found := f.pods[uid]
return pod, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}
func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
@ -78,7 +79,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
return nil, err
}
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodGetter(), kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodDeletionProvider(), kubeRuntimeManager)
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),

View File

@ -346,13 +346,12 @@ func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerO
func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
filter := &runtimeapi.ContainerFilter{}
if !allContainers {
runningState := runtimeapi.ContainerState_CONTAINER_RUNNING
filter.State = &runtimeapi.ContainerStateValue{
State: runningState,
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
}
}
containers, err := m.getContainersHelper(filter)
containers, err := m.runtimeService.ListContainers(filter)
if err != nil {
glog.Errorf("getKubeletContainers failed: %v", err)
return nil, err
@ -361,16 +360,6 @@ func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]
return containers, nil
}
// getContainers lists containers by filter.
func (m *kubeGenericRuntimeManager) getContainersHelper(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
resp, err := m.runtimeService.ListContainers(filter)
if err != nil {
return nil, err
}
return resp, err
}
// makeUID returns a randomly generated string.
func makeUID() string {
return fmt.Sprintf("%08x", rand.Uint32())

View File

@ -32,17 +32,17 @@ import (
// containerGC is the manager of garbage collection.
type containerGC struct {
client internalapi.RuntimeService
manager *kubeGenericRuntimeManager
podGetter podGetter
client internalapi.RuntimeService
manager *kubeGenericRuntimeManager
podDeletionProvider podDeletionProvider
}
// NewContainerGC creates a new containerGC.
func NewContainerGC(client internalapi.RuntimeService, podGetter podGetter, manager *kubeGenericRuntimeManager) *containerGC {
func NewContainerGC(client internalapi.RuntimeService, podDeletionProvider podDeletionProvider, manager *kubeGenericRuntimeManager) *containerGC {
return &containerGC{
client: client,
manager: manager,
podGetter: podGetter,
client: client,
manager: manager,
podDeletionProvider: podDeletionProvider,
}
}
@ -52,8 +52,6 @@ type containerGCInfo struct {
id string
// The name of the container.
name string
// The sandbox ID which this container belongs to
sandboxID string
// Creation time for the container.
createTime time.Time
}
@ -159,12 +157,6 @@ func (cgc *containerGC) removeSandbox(sandboxID string) {
}
}
// isPodDeleted returns true if the pod is already deleted.
func (cgc *containerGC) isPodDeleted(podUID types.UID) bool {
_, found := cgc.podGetter.GetPodByUID(podUID)
return !found
}
// evictableContainers gets all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, error) {
@ -191,7 +183,6 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
id: container.Id,
name: container.Metadata.Name,
createTime: createdAt,
sandboxID: container.PodSandboxId,
}
key := evictUnit{
uid: labeledInfo.PodUID,
@ -219,7 +210,7 @@ func (cgc *containerGC) evictContainers(gcPolicy kubecontainer.ContainerGCPolicy
// Remove deleted pod containers if all sources are ready.
if allSourcesReady {
for key, unit := range evictUnits {
if cgc.isPodDeleted(key.uid) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(key.uid) || evictNonDeletedPods {
cgc.removeOldestN(unit, len(unit)) // Remove all.
delete(evictUnits, key)
}
@ -307,7 +298,7 @@ func (cgc *containerGC) evictSandboxes(evictNonDeletedPods bool) error {
}
for podUID, sandboxes := range sandboxesByPod {
if cgc.isPodDeleted(podUID) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(podUID) || evictNonDeletedPods {
// Remove all evictable sandboxes if the pod has been removed.
// Note that the latest dead sandbox is also removed if there is
// already an active one.
@ -333,7 +324,7 @@ func (cgc *containerGC) evictPodLogsDirectories(allSourcesReady bool) error {
for _, dir := range dirs {
name := dir.Name()
podUID := types.UID(name)
if !cgc.isPodDeleted(podUID) {
if !cgc.podDeletionProvider.IsPodDeleted(podUID) {
continue
}
err := osInterface.RemoveAll(filepath.Join(podLogsRootDirectory, name))

View File

@ -34,11 +34,11 @@ func TestSandboxGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCSandbox := func(pod *v1.Pod, attempt uint32, state runtimeapi.PodSandboxState, withPodGetter bool, createdAt int64) sandboxTemplate {
if withPodGetter {
// initialize the pod getter
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return sandboxTemplate{
pod: pod,
@ -162,13 +162,13 @@ func TestContainerGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCContainer := func(podName, containerName string, attempt int, createdAt int64, state runtimeapi.ContainerState) containerTemplate {
container := makeTestContainer(containerName, "test-image")
pod := makeTestPod(podName, "test-ns", podName, []v1.Container{container})
if podName != "deleted" {
// initialize the pod getter, explicitly exclude deleted pod
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return containerTemplate{
pod: pod,
@ -361,11 +361,11 @@ func TestPodLogDirectoryGC(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakeOS := m.osInterface.(*containertest.FakeOS)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
// pod log directories without corresponding pods should be removed.
fakePodGetter.pods["123"] = makeTestPod("foo1", "new", "123", nil)
fakePodGetter.pods["456"] = makeTestPod("foo2", "new", "456", nil)
podDeletionProvider.pods["123"] = struct{}{}
podDeletionProvider.pods["456"] = struct{}{}
files := []string{"123", "456", "789", "012"}
removed := []string{filepath.Join(podLogsRootDirectory, "789"), filepath.Join(podLogsRootDirectory, "012")}

View File

@ -65,9 +65,9 @@ var (
ErrVersionNotSupported = errors.New("Runtime api version is not supported")
)
// A subset of the pod.Manager interface extracted for garbage collection purposes.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}
type kubeGenericRuntimeManager struct {
@ -127,7 +127,7 @@ func NewKubeGenericRuntimeManager(
seccompProfileRoot string,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
@ -193,7 +193,7 @@ func NewKubeGenericRuntimeManager(
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podGetter, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podDeletionProvider, kubeRuntimeManager)
kubeRuntimeManager.versionCache = cache.NewObjectCache(
func() (interface{}, error) {

View File

@ -26,7 +26,6 @@ import (
rktapi "github.com/coreos/rkt/api/v1alpha"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -179,17 +178,19 @@ func (f *fakeRktCli) Reset() {
f.err = nil
}
type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}
func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{pods: make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}
func (f fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
p, found := f.pods[uid]
return p, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}
type fakeUnitGetter struct {

View File

@ -160,7 +160,7 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring
containerRefManager *kubecontainer.RefManager
podGetter podGetter
podDeletionProvider podDeletionProvider
runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder
livenessManager proberesults.Manager
@ -201,9 +201,9 @@ type podServiceDirective struct {
var _ kubecontainer.Runtime = &Runtime{}
var _ kubecontainer.DirectStreamingRuntime = &Runtime{}
// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}
// cliInterface wrapps the command line calls for testing purpose.
@ -228,7 +228,7 @@ func New(
runtimeHelper kubecontainer.RuntimeHelper,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
livenessManager proberesults.Manager,
httpClient types.HttpGetter,
networkPlugin network.NetworkPlugin,
@ -285,7 +285,7 @@ func New(
config: config,
dockerKeyring: credentialprovider.NewDockerKeyring(),
containerRefManager: containerRefManager,
podGetter: podGetter,
podDeletionProvider: podDeletionProvider,
runtimeHelper: runtimeHelper,
recorder: recorder,
livenessManager: livenessManager,
@ -2020,8 +2020,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
removeCandidates = append(removeCandidates, pod)
continue
}
_, found := r.podGetter.GetPodByUID(uid)
if !found && allSourcesReady {
if r.podDeletionProvider.IsPodDeleted(uid) && allSourcesReady {
removeCandidates = append(removeCandidates, pod)
continue
}

View File

@ -1636,7 +1636,7 @@ func TestGarbageCollect(t *testing.T) {
fs := newFakeSystemd()
cli := newFakeRktCli()
fakeOS := kubetesting.NewFakeOS()
getter := newFakePodGetter()
deletionProvider := newFakePodDeletionProvider()
fug := newfakeUnitGetter()
frh := &containertesting.FakeRuntimeHelper{}
@ -1644,7 +1644,7 @@ func TestGarbageCollect(t *testing.T) {
os: fakeOS,
cli: cli,
apisvc: fr,
podGetter: getter,
podDeletionProvider: deletionProvider,
systemd: fs,
containerRefManager: kubecontainer.NewRefManager(),
unitGetter: fug,
@ -1830,7 +1830,7 @@ func TestGarbageCollect(t *testing.T) {
fr.pods = tt.pods
for _, p := range tt.apiPods {
getter.pods[p.UID] = p
deletionProvider.pods[p.UID] = struct{}{}
}
allSourcesReady := true
@ -1862,7 +1862,7 @@ func TestGarbageCollect(t *testing.T) {
ctrl.Finish()
fakeOS.Removes = []string{}
fs.resetFailedUnits = []string{}
getter.pods = make(map[kubetypes.UID]*v1.Pod)
deletionProvider.pods = make(map[kubetypes.UID]struct{})
}
}