Merge pull request #8274 from yujuhong/filter_terminated

kubelet: filter out terminated pods before rejecting pods
This commit is contained in:
Victor Marmol 2015-05-15 08:02:44 -07:00
commit e7750fa0c7

View File

@ -1181,23 +1181,8 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
// Reject pods that we cannot run.
kl.handleNotFittingPods(allPods)
// Reject new creation requests if diskspace is running low.
kl.handleOutOfDisk(allPods, podSyncTypes)
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave irregardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired: %#v", pods)
var err error
@ -1358,10 +1343,10 @@ func (kl *Kubelet) checkCapacityExceeded(pods []*api.Pod) (fitting []*api.Pod, n
}
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) {
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod {
if len(podSyncTypes) == 0 {
// regular sync. no new pods
return
return pods
}
outOfDockerDisk := false
outOfRootDisk := false
@ -1381,18 +1366,26 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]m
// We ignore the first disk check to ensure that running pods are not killed.
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl.diskSpaceManager.Unfreeze()
if outOfDockerDisk || outOfRootDisk {
for _, pod := range pods {
// Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == metrics.SyncPodCreate {
kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to lack of disk space."})
continue
}
}
if !outOfDockerDisk && !outOfRootDisk {
// Disk space is fine.
return pods
}
var fitting []*api.Pod
for i := range pods {
pod := pods[i]
// Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == metrics.SyncPodCreate {
kl.recorder.Eventf(pod, "OutOfDisk", "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to lack of disk space."})
continue
}
fitting = append(fitting, pod)
}
return fitting
}
// checkNodeSelectorMatching detects pods that do not match node's labels.
@ -1412,9 +1405,10 @@ func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Po
return
}
// handleNotfittingPods handles pods that do not fit on the node.
// Currently conflicts on Port.HostPort values, matching node's labels and exceeding node's capacity are handled.
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) {
// handleNotfittingPods handles pods that do not fit on the node and returns
// the pods that fit. It currently checks host port conflicts, node selector
// mismatches, and exceeded node capacity.
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
@ -1436,6 +1430,38 @@ func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) {
Phase: api.PodFailed,
Message: "Pod cannot be started due to exceeded capacity"})
}
return fitting
}
// admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted.
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType) []*api.Pod {
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave irregardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
// Reject pods that we cannot run.
// handleNotFittingPods relies on static information (e.g. immutable fields
// in the pod specs or machine information that doesn't change without
// rebooting), and the pods are sorted by immutable creation time. Hence it
// should only rejects new pods without checking the pod sync types.
fitting := kl.handleNotFittingPods(pods)
// Reject new creation requests if diskspace is running low.
admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes)
return admittedPods
}
// syncLoop is the main loop for processing changes. It watches for changes from