mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #21438 from yujuhong/terminated_pods
Auto commit by PR queue bot
This commit is contained in:
commit
6716344d24
@ -1894,31 +1894,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete any pods that are no longer running and are marked for deletion.
|
||||
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
|
||||
var terminating []*api.Pod
|
||||
for _, pod := range pods {
|
||||
if pod.DeletionTimestamp != nil {
|
||||
found := false
|
||||
for _, runningPod := range runningPods {
|
||||
if runningPod.ID == pod.UID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if found {
|
||||
glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
|
||||
continue
|
||||
}
|
||||
terminating = append(terminating, pod)
|
||||
}
|
||||
}
|
||||
if !kl.statusManager.TerminatePods(terminating) {
|
||||
return errors.New("not all pods were successfully terminated")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pastActiveDeadline returns true if the pod has been active for more than
|
||||
// ActiveDeadlineSeconds.
|
||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
||||
@ -2110,10 +2085,6 @@ func (kl *Kubelet) HandlePodCleanups() error {
|
||||
// Remove any orphaned mirror pods.
|
||||
kl.podManager.DeleteOrphanedMirrorPods()
|
||||
|
||||
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
|
||||
glog.Errorf("Failed to cleanup terminated pods: %v", err)
|
||||
}
|
||||
|
||||
// Clear out any old bandwidth rules
|
||||
if err = kl.cleanupBandwidthLimits(allPods); err != nil {
|
||||
return err
|
||||
@ -2374,6 +2345,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
|
||||
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
|
||||
if kl.podIsTerminated(pod) {
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// If the pod is in a termianted state, there is no pod worker to
|
||||
// handle the work item. Check if the DeletionTimestamp has been
|
||||
// set, and force a status update to trigger a pod deletion request
|
||||
// to the apiserver.
|
||||
kl.statusManager.TerminatePod(pod)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Run the sync in an async worker.
|
||||
|
@ -83,10 +83,9 @@ type Manager interface {
|
||||
// triggers a status update.
|
||||
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
|
||||
|
||||
// TerminatePods resets the container status for the provided pods to terminated and triggers
|
||||
// a status update. This function may not enqueue all the provided pods, in which case it will
|
||||
// return false
|
||||
TerminatePods(pods []*api.Pod) bool
|
||||
// TerminatePod resets the container status for the provided pod to terminated and triggers
|
||||
// a status update.
|
||||
TerminatePod(pod *api.Pod)
|
||||
|
||||
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
|
||||
// the provided podUIDs.
|
||||
@ -147,7 +146,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
||||
m.podStatusesLock.Lock()
|
||||
defer m.podStatusesLock.Unlock()
|
||||
|
||||
m.updateStatusInternal(pod, status)
|
||||
m.updateStatusInternal(pod, status, false)
|
||||
}
|
||||
|
||||
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
|
||||
@ -187,13 +186,10 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure we're not updating the cached version.
|
||||
clone, err := api.Scheme.DeepCopy(&oldStatus.status)
|
||||
status, err := copyStatus(&oldStatus.status)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err)
|
||||
return
|
||||
}
|
||||
status := *clone.(*api.PodStatus)
|
||||
status.ContainerStatuses[containerIndex].Ready = ready
|
||||
|
||||
// Update pod condition.
|
||||
@ -212,31 +208,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
||||
status.Conditions = append(status.Conditions, readyCondition)
|
||||
}
|
||||
|
||||
m.updateStatusInternal(pod, status)
|
||||
m.updateStatusInternal(pod, status, false)
|
||||
}
|
||||
|
||||
func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
||||
allSent := true
|
||||
func (m *manager) TerminatePod(pod *api.Pod) {
|
||||
m.podStatusesLock.Lock()
|
||||
defer m.podStatusesLock.Unlock()
|
||||
for _, pod := range pods {
|
||||
for i := range pod.Status.ContainerStatuses {
|
||||
pod.Status.ContainerStatuses[i].State = api.ContainerState{
|
||||
Terminated: &api.ContainerStateTerminated{},
|
||||
}
|
||||
}
|
||||
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
|
||||
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
|
||||
allSent = false
|
||||
oldStatus := &pod.Status
|
||||
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
|
||||
oldStatus = &cachedStatus.status
|
||||
}
|
||||
status, err := copyStatus(oldStatus)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for i := range status.ContainerStatuses {
|
||||
status.ContainerStatuses[i].State = api.ContainerState{
|
||||
Terminated: &api.ContainerStateTerminated{},
|
||||
}
|
||||
}
|
||||
return allSent
|
||||
m.updateStatusInternal(pod, pod.Status, true)
|
||||
}
|
||||
|
||||
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
||||
// necessary. Returns whether an update was triggered.
|
||||
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
|
||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
|
||||
var oldStatus api.PodStatus
|
||||
cachedStatus, isCached := m.podStatuses[pod.UID]
|
||||
if isCached {
|
||||
@ -269,7 +266,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
|
||||
|
||||
// The intent here is to prevent concurrent updates to a pod's status from
|
||||
// clobbering each other so the phase of a pod progresses monotonically.
|
||||
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
|
||||
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
|
||||
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
||||
return false // No new status.
|
||||
}
|
||||
@ -288,6 +285,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
|
||||
default:
|
||||
// Let the periodic syncBatch handle the update if the channel is full.
|
||||
// We can't block, since we hold the mutex lock.
|
||||
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
|
||||
format.Pod(pod), status)
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -495,3 +494,13 @@ func notRunning(statuses []api.ContainerStatus) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func copyStatus(source *api.PodStatus) (api.PodStatus, error) {
|
||||
clone, err := api.Scheme.DeepCopy(source)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to clone status %+v: %v", source, err)
|
||||
return api.PodStatus{}, err
|
||||
}
|
||||
status := *clone.(*api.PodStatus)
|
||||
return status, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user