diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index aa5e76eaf1d..5e64acc9e31 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -79,6 +79,7 @@ import ( utilerrors "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/integer" kubeio "k8s.io/kubernetes/pkg/util/io" utilipt "k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/mount" @@ -153,6 +154,9 @@ const ( // maxImagesInStatus is the number of max images we store in image status. maxImagesInNodeStatus = 50 + + // Minimum number of dead containers to keep in a pod + minDeadContainerInPod = 1 ) // SyncHandler is an interface implemented by Kubelet, for testability @@ -479,6 +483,7 @@ func NewMainKubelet( return nil, err } klet.containerGC = containerGC + klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) // setup imageManager imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy) @@ -826,6 +831,9 @@ type Kubelet struct { // should manage attachment/detachment of volumes scheduled to this node, // and disable kubelet from executing any attach/detach operations enableControllerAttachDetach bool + + // trigger deleting containers in a pod + containerDeletor *podContainerDeletor } // setupDataDirs creates: @@ -2161,7 +2169,6 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle case kubetypes.SET: // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") - } case e := <-plegCh: if isSyncPodWorthy(e) { @@ -2175,6 +2182,13 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*api.Pod{pod}) } + if e.Type == pleg.ContainerDied { + if podStatus, err := kl.podCache.Get(e.ID); err == nil { + if containerID, ok := e.Data.(string); ok { + kl.containerDeletor.deleteContainersInPod(containerID, podStatus) + } + } + } case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() @@ -2902,7 +2916,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) } -// Filter out events that are not worthy of pod syncing +// isSyncPodWorthy filters out events that are not worthy of pod syncing func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { // ContatnerRemoved doesn't affect pod state return event.Type != pleg.ContainerRemoved diff --git a/pkg/kubelet/pod_container_deletor.go b/pkg/kubelet/pod_container_deletor.go new file mode 100644 index 00000000000..d61228947d3 --- /dev/null +++ b/pkg/kubelet/pod_container_deletor.go @@ -0,0 +1,104 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "sort" + + "github.com/golang/glog" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/wait" +) + +const ( + // The limit on the number of buffered container deletion requests + // This number is a bit arbitrary and may be adjusted in the future. + containerDeletorBufferLimit = 50 +) + +type containerStatusbyCreatedList []*kubecontainer.ContainerStatus + +type podContainerDeletor struct { + worker chan<- kubecontainer.ContainerID + containersToKeep int +} + +func (a containerStatusbyCreatedList) Len() int { return len(a) } +func (a containerStatusbyCreatedList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a containerStatusbyCreatedList) Less(i, j int) bool { return a[i].CreatedAt.After(a[j].CreatedAt) } + +func newPodContainerDeletor(runtime kubecontainer.Runtime, containersToKeep int) *podContainerDeletor { + buffer := make(chan kubecontainer.ContainerID, containerDeletorBufferLimit) + go wait.Until(func() { + for { + select { + case id := <-buffer: + runtime.DeleteContainer(id) + } + } + }, 0, wait.NeverStop) + + return &podContainerDeletor{ + worker: buffer, + containersToKeep: containersToKeep, + } +} + +// getContainersToDeleteInPod returns the exited containers in a pod whose name matches the name inferred from exitedContainerID, ordered by the creation time from the latest to the earliest. +func (p *podContainerDeletor) getContainersToDeleteInPod(exitedContainerID string, podStatus *kubecontainer.PodStatus) containerStatusbyCreatedList { + var matchedContainer *kubecontainer.ContainerStatus + var exitedContainers []*kubecontainer.ContainerStatus + // Find all exited containers in the pod + for _, containerStatus := range podStatus.ContainerStatuses { + if containerStatus.State != kubecontainer.ContainerStateExited { + continue + } + if containerStatus.ID.ID == exitedContainerID { + matchedContainer = containerStatus + } + exitedContainers = append(exitedContainers, containerStatus) + } + if matchedContainer == nil { + glog.Warningf("Container %q not found in pod's exited containers", exitedContainerID) + return containerStatusbyCreatedList{} + } + + // Find the exited containers whose name matches the name of the container with id being exitedContainerID + var candidates containerStatusbyCreatedList + for _, containerStatus := range exitedContainers { + if matchedContainer.Name == containerStatus.Name { + candidates = append(candidates, containerStatus) + } + } + if len(candidates) <= p.containersToKeep { + return containerStatusbyCreatedList{} + } + + sort.Sort(candidates) + return candidates[p.containersToKeep:] +} + +// deleteContainersInPod issues container deletion requests for containers selected by getContainersToDeleteInPod. +func (p *podContainerDeletor) deleteContainersInPod(exitedContainerID string, podStatus *kubecontainer.PodStatus) { + for _, candidate := range p.getContainersToDeleteInPod(exitedContainerID, podStatus) { + select { + case p.worker <- candidate.ID: + default: + glog.Warningf("Failed to issue the request to remove container %v", candidate.ID) + } + } +} diff --git a/pkg/kubelet/pod_container_deletor_test.go b/pkg/kubelet/pod_container_deletor_test.go new file mode 100644 index 00000000000..52a1b8f730c --- /dev/null +++ b/pkg/kubelet/pod_container_deletor_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "reflect" + "testing" + "time" + + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" +) + +func testGetContainersToDeleteInPod(t *testing.T) { + pod := kubecontainer.PodStatus{ + ContainerStatuses: []*kubecontainer.ContainerStatus{ + { + ID: kubecontainer.ContainerID{Type: "test", ID: "1"}, + Name: "foo", + CreatedAt: time.Now(), + State: kubecontainer.ContainerStateExited, + }, + { + ID: kubecontainer.ContainerID{Type: "test", ID: "2"}, + Name: "bar", + CreatedAt: time.Now().Add(time.Second), + State: kubecontainer.ContainerStateExited, + }, + { + ID: kubecontainer.ContainerID{Type: "test", ID: "3"}, + Name: "bar", + CreatedAt: time.Now().Add(2 * time.Second), + State: kubecontainer.ContainerStateExited, + }, + { + ID: kubecontainer.ContainerID{Type: "test", ID: "4"}, + Name: "bar", + CreatedAt: time.Now().Add(3 * time.Second), + State: kubecontainer.ContainerStateExited, + }, + { + ID: kubecontainer.ContainerID{Type: "test", ID: "5"}, + Name: "bar", + CreatedAt: time.Now().Add(4 * time.Second), + State: kubecontainer.ContainerStateRunning, + }, + }, + } + + expectedCandidates := []*kubecontainer.ContainerStatus{pod.ContainerStatuses[2], pod.ContainerStatuses[1]} + candidates := newPodContainerDeletor(&containertest.FakeRuntime{}, 1).getContainersToDeleteInPod("2", &pod) + if !reflect.DeepEqual(candidates, expectedCandidates) { + t.Errorf("expected %v got %v", expectedCandidates, candidates) + } +}