diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index e8658366a48..c5013e9017b 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -180,6 +180,7 @@ go_test( "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/status:go_default_library", + "//pkg/kubelet/status/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 06a990754c3..ffdc20de456 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -719,7 +719,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } klet.imageManager = imageManager - klet.statusManager = status.NewManager(kubeClient, klet.podManager) + klet.statusManager = status.NewManager(kubeClient, klet.podManager, klet) klet.probeManager = prober.NewManager( klet.statusManager, diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 34fdd8d6787..9923961ea98 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -727,6 +727,37 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool { return false } +// Returns true if all required node-level resources that a pod was consuming have been reclaimed by the kubelet. +// Reclaiming resources is a prerequisite to deleting a pod from the API server. +func (kl *Kubelet) OkToDeletePod(pod *v1.Pod) bool { + if pod.DeletionTimestamp == nil { + // We shouldnt delete pods whose DeletionTimestamp is not set + return false + } + if !notRunning(pod.Status.ContainerStatuses) { + // We shouldnt delete pods that still have running containers + glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod)) + return false + } + if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes { + // We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes + glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod)) + return false + } + return true +} + +// notRunning returns true if every status is terminated or waiting, or the status list +// is empty. +func notRunning(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return false + } + } + return true +} + // filterOutTerminatedPods returns the given pods which the status manager // does not consider failed or succeeded. func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index fcbb6588576..4bd2ab69f17 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -59,6 +59,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager" @@ -179,7 +180,7 @@ func newTestKubeletWithImageList( } kubelet.secretManager = secretManager kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager) - kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager) + kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}) kubelet.containerRefManager = kubecontainer.NewRefManager() diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{}) if err != nil { diff --git a/pkg/kubelet/prober/BUILD b/pkg/kubelet/prober/BUILD index 2f01e8c9ec1..2e70f6c73cb 100644 --- a/pkg/kubelet/prober/BUILD +++ b/pkg/kubelet/prober/BUILD @@ -57,6 +57,7 @@ go_test( "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/status:go_default_library", + "//pkg/kubelet/status/testing:go_default_library", "//pkg/probe:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/intstr:go_default_library", diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index 2dcc972c6ef..09c53fce757 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -28,6 +28,7 @@ import ( kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util/exec" ) @@ -102,7 +103,7 @@ func newTestManager() *manager { // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( - status.NewManager(&fake.Clientset{}, podManager), + status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}), results.NewManager(), nil, // runner refManager, diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 590756a3249..85558bad2d6 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -31,6 +31,7 @@ import ( kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util/exec" ) @@ -117,7 +118,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil)) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil), &statustest.FakePodDeletionSafetyProvider{}) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 1a1e39e90ab..e4ab3d38f12 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" "k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" @@ -76,7 +77,7 @@ func TestRunOnce(t *testing.T) { cadvisor: cadvisor, nodeLister: testNodeLister{}, nodeInfo: testNodeInfo{}, - statusManager: status.NewManager(nil, podManager), + statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}), containerRefManager: kubecontainer.NewRefManager(), podManager: podManager, os: &containertest.FakeOS{}, diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index b9df40b46b0..062564d4ee1 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -51,6 +51,7 @@ go_test( "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", "//pkg/kubelet/secret:go_default_library", + "//pkg/kubelet/status/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:github.com/stretchr/testify/assert", "//vendor:k8s.io/apimachinery/pkg/api/errors", @@ -69,6 +70,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/kubelet/status/testing:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index f4a9aaabc55..b9cbc25681e 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -66,6 +66,7 @@ type manager struct { // Map from (mirror) pod UID to latest status version successfully sent to the API server. // apiStatusVersions must only be accessed from the sync thread. apiStatusVersions map[types.UID]uint64 + podDeletionSafety PodDeletionSafetyProvider } // PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components @@ -76,6 +77,12 @@ type PodStatusProvider interface { GetPodStatus(uid types.UID) (v1.PodStatus, bool) } +// An object which provides guarantees that a pod can be saftely deleted. +type PodDeletionSafetyProvider interface { + // A function which returns true if the pod can safely be deleted + OkToDeletePod(pod *v1.Pod) bool +} + // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with // the latest v1.PodStatus. It also syncs updates back to the API server. type Manager interface { @@ -102,13 +109,14 @@ type Manager interface { const syncPeriod = 10 * time.Second -func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager) Manager { +func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager { return &manager{ kubeClient: kubeClient, podManager: podManager, podStatuses: make(map[types.UID]versionedPodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses apiStatusVersions: make(map[types.UID]uint64), + podDeletionSafety: podDeletionSafety, } } @@ -380,7 +388,7 @@ func (m *manager) syncBatch() { } syncedUID = mirrorUID } - if m.needsUpdate(syncedUID, status) { + if m.needsUpdate(syncedUID, status) || m.couldBeDeleted(uid, status.status) { updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } else if m.needsReconcile(uid, status.status) { // Delete the apiStatusVersions here to force an update on the pod status @@ -433,11 +441,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { // We don't handle graceful deletion of mirror pods. return } - if pod.DeletionTimestamp == nil { - return - } - if !notRunning(pod.Status.ContainerStatuses) { - glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod)) + if !m.podDeletionSafety.OkToDeletePod(pod) { return } deleteOptions := metav1.NewDeleteOptions(0) @@ -462,6 +466,15 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { return !ok || latest < status.version } +func (m *manager) couldBeDeleted(uid types.UID, status v1.PodStatus) bool { + // The pod could be a static pod, so we should translate first. + pod, ok := m.podManager.GetPodByUID(uid) + if !ok { + return false + } + return !kubepod.IsMirrorPod(pod) && m.podDeletionSafety.OkToDeletePod(pod) +} + // needsReconcile compares the given status with the status in the pod manager (which // in fact comes from apiserver), returns whether the status needs to be reconciled with // the apiserver. Now when pod status is inconsistent between apiserver and kubelet, @@ -562,17 +575,6 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { return status } -// notRunning returns true if every status is terminated or waiting, or the status list -// is empty. -func notRunning(statuses []v1.ContainerStatus) bool { - for _, status := range statuses { - if status.State.Terminated == nil && status.State.Waiting == nil { - return false - } - } - return true -} - func copyStatus(source *v1.PodStatus) (v1.PodStatus, error) { clone, err := api.Scheme.DeepCopy(source) if err != nil { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 9540a3a9d86..52210aef25a 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -39,6 +39,7 @@ import ( kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" kubesecret "k8s.io/kubernetes/pkg/kubelet/secret" + statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -74,7 +75,7 @@ func (m *manager) testSyncBatch() { func newTestManager(kubeClient clientset.Interface) *manager { podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager()) podManager.AddPod(getTestPod()) - return NewManager(kubeClient, podManager).(*manager) + return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) } func generateRandomMessage() string { diff --git a/pkg/kubelet/status/testing/BUILD b/pkg/kubelet/status/testing/BUILD new file mode 100644 index 00000000000..93a4a942261 --- /dev/null +++ b/pkg/kubelet/status/testing/BUILD @@ -0,0 +1,31 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["fake_pod_deletion_safety.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/kubelet/pod:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go new file mode 100644 index 00000000000..c05382907f3 --- /dev/null +++ b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go @@ -0,0 +1,28 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/kubernetes/pkg/api/v1" + kubepod "k8s.io/kubernetes/pkg/kubelet/pod" +) + +type FakePodDeletionSafetyProvider struct{} + +func (f *FakePodDeletionSafetyProvider) OkToDeletePod(pod *v1.Pod) bool { + return !kubepod.IsMirrorPod(pod) && pod.DeletionTimestamp != nil +} diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 5e8d8d4137a..e63e4cfbafd 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -135,7 +135,18 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { } func isPodTerminated(pod *v1.Pod) bool { - return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded + return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(pod.Status.ContainerStatuses)) +} + +// notRunning returns true if every status is terminated or waiting, or the status list +// is empty. +func notRunning(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return false + } + } + return true } // Iterate through all pods and add to desired state of world if they don't