mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Merge pull request #22155 from Random-Liu/terminated-pods
Auto commit by PR queue bot
This commit is contained in:
commit
ae2a8da4cc
@ -297,6 +297,8 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
f.Stopped = append(f.Stopped, id)
|
f.Stopped = append(f.Stopped, id)
|
||||||
|
// Container status should be Updated before container moved to ExitedContainerList
|
||||||
|
f.updateContainerStatus(id, statusExitedPrefix)
|
||||||
var newList []docker.APIContainers
|
var newList []docker.APIContainers
|
||||||
for _, container := range f.ContainerList {
|
for _, container := range f.ContainerList {
|
||||||
if container.ID == id {
|
if container.ID == id {
|
||||||
@ -323,7 +325,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
|
|||||||
container.State.Running = false
|
container.State.Running = false
|
||||||
}
|
}
|
||||||
f.ContainerMap[id] = container
|
f.ContainerMap[id] = container
|
||||||
f.updateContainerStatus(id, statusExitedPrefix)
|
|
||||||
f.normalSleep(200, 50, 50)
|
f.normalSleep(200, 50, 50)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -333,11 +334,20 @@ func (f *FakeDockerClient) RemoveContainer(opts docker.RemoveContainerOptions) e
|
|||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
f.called = append(f.called, "remove")
|
f.called = append(f.called, "remove")
|
||||||
err := f.popError("remove")
|
err := f.popError("remove")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
f.Removed = append(f.Removed, opts.ID)
|
return err
|
||||||
}
|
}
|
||||||
delete(f.ContainerMap, opts.ID)
|
for i := range f.ExitedContainerList {
|
||||||
return err
|
if f.ExitedContainerList[i].ID == opts.ID {
|
||||||
|
delete(f.ContainerMap, opts.ID)
|
||||||
|
f.ExitedContainerList = append(f.ExitedContainerList[:i], f.ExitedContainerList[i+1:]...)
|
||||||
|
f.Removed = append(f.Removed, opts.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// To be a good fake, report error if container is not stopped.
|
||||||
|
return fmt.Errorf("container not stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logs is a test-spy implementation of DockerInterface.Logs.
|
// Logs is a test-spy implementation of DockerInterface.Logs.
|
||||||
|
@ -1950,31 +1950,6 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete any pods that are no longer running and are marked for deletion.
|
|
||||||
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
|
|
||||||
var terminating []*api.Pod
|
|
||||||
for _, pod := range pods {
|
|
||||||
if pod.DeletionTimestamp != nil {
|
|
||||||
found := false
|
|
||||||
for _, runningPod := range runningPods {
|
|
||||||
if runningPod.ID == pod.UID {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if found {
|
|
||||||
glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
terminating = append(terminating, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !kl.statusManager.TerminatePods(terminating) {
|
|
||||||
return errors.New("not all pods were successfully terminated")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// pastActiveDeadline returns true if the pod has been active for more than
|
// pastActiveDeadline returns true if the pod has been active for more than
|
||||||
// ActiveDeadlineSeconds.
|
// ActiveDeadlineSeconds.
|
||||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
||||||
@ -2166,10 +2141,6 @@ func (kl *Kubelet) HandlePodCleanups() error {
|
|||||||
// Remove any orphaned mirror pods.
|
// Remove any orphaned mirror pods.
|
||||||
kl.podManager.DeleteOrphanedMirrorPods()
|
kl.podManager.DeleteOrphanedMirrorPods()
|
||||||
|
|
||||||
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
|
|
||||||
glog.Errorf("Failed to cleanup terminated pods: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear out any old bandwidth rules
|
// Clear out any old bandwidth rules
|
||||||
if err = kl.cleanupBandwidthLimits(allPods); err != nil {
|
if err = kl.cleanupBandwidthLimits(allPods); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -2430,6 +2401,13 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
|||||||
|
|
||||||
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
|
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
|
||||||
if kl.podIsTerminated(pod) {
|
if kl.podIsTerminated(pod) {
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
// If the pod is in a termianted state, there is no pod worker to
|
||||||
|
// handle the work item. Check if the DeletionTimestamp has been
|
||||||
|
// set, and force a status update to trigger a pod deletion request
|
||||||
|
// to the apiserver.
|
||||||
|
kl.statusManager.TerminatePod(pod)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Run the sync in an async worker.
|
// Run the sync in an async worker.
|
||||||
|
@ -124,17 +124,17 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer
|
|||||||
case plegContainerExited:
|
case plegContainerExited:
|
||||||
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
||||||
case plegContainerUnknown:
|
case plegContainerUnknown:
|
||||||
// Don't generate any event if the status is unknown.
|
return &PodLifecycleEvent{ID: podID, Type: ContainerChanged, Data: cid}
|
||||||
return nil
|
|
||||||
case plegContainerNonExistent:
|
case plegContainerNonExistent:
|
||||||
// We report "ContainerDied" when container was stopped OR removed. We
|
// We report "ContainerDied" when container was stopped OR removed. We
|
||||||
// may want to distinguish the two cases in the future.
|
// may want to distinguish the two cases in the future.
|
||||||
switch oldState {
|
switch oldState {
|
||||||
case plegContainerExited:
|
case plegContainerExited:
|
||||||
// We already reported that the container died before. There is no
|
// We already reported that the container died before.
|
||||||
// need to do it again.
|
return &PodLifecycleEvent{ID: podID, Type: ContainerRemoved, Data: cid}
|
||||||
return nil
|
|
||||||
default:
|
default:
|
||||||
|
// TODO: We may want to generate a ContainerRemoved event as well.
|
||||||
|
// It's ok now because no one relies on the ContainerRemoved event.
|
||||||
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -165,9 +165,7 @@ func (g *GenericPLEG) relist() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
pods := kubecontainer.Pods(podList)
|
pods := kubecontainer.Pods(podList)
|
||||||
for _, pod := range pods {
|
g.podRecords.setCurrent(pods)
|
||||||
g.podRecords.setCurrent(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare the old and the current pods, and generate events.
|
// Compare the old and the current pods, and generate events.
|
||||||
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
||||||
@ -204,6 +202,10 @@ func (g *GenericPLEG) relist() {
|
|||||||
// Update the internal storage and send out the events.
|
// Update the internal storage and send out the events.
|
||||||
g.podRecords.update(pid)
|
g.podRecords.update(pid)
|
||||||
for i := range events {
|
for i := range events {
|
||||||
|
// Filter out events that are not reliable and no other components use yet.
|
||||||
|
if events[i].Type == ContainerChanged || events[i].Type == ContainerRemoved {
|
||||||
|
continue
|
||||||
|
}
|
||||||
g.eventChannel <- events[i]
|
g.eventChannel <- events[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -304,12 +306,17 @@ func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
|
|||||||
return r.current
|
return r.current
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pr podRecords) setCurrent(pod *kubecontainer.Pod) {
|
func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
|
||||||
if r, ok := pr[pod.ID]; ok {
|
for i := range pr {
|
||||||
r.current = pod
|
pr[i].current = nil
|
||||||
return
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
if r, ok := pr[pod.ID]; ok {
|
||||||
|
r.current = pod
|
||||||
|
} else {
|
||||||
|
pr[pod.ID] = &podRecord{current: pod}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pr[pod.ID] = &podRecord{current: pod}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pr podRecords) update(id types.UID) {
|
func (pr podRecords) update(id types.UID) {
|
||||||
|
@ -23,13 +23,14 @@ import (
|
|||||||
type PodLifeCycleEventType string
|
type PodLifeCycleEventType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
|
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
|
||||||
ContainerDied PodLifeCycleEventType = "ContainerDied"
|
ContainerDied PodLifeCycleEventType = "ContainerDied"
|
||||||
NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted"
|
|
||||||
NetworkFailed PodLifeCycleEventType = "NetworkFailed"
|
|
||||||
// PodSync is used to trigger syncing of a pod when the observed change of
|
// PodSync is used to trigger syncing of a pod when the observed change of
|
||||||
// the state of the pod cannot be captured by any single event above.
|
// the state of the pod cannot be captured by any single event above.
|
||||||
PodSync PodLifeCycleEventType = "PodSync"
|
PodSync PodLifeCycleEventType = "PodSync"
|
||||||
|
// Do not use the events below because they are disabled in GenericPLEG.
|
||||||
|
ContainerRemoved PodLifeCycleEventType = "ContainerRemoved"
|
||||||
|
ContainerChanged PodLifeCycleEventType = "ContainerChanged"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodLifecycleEvent is an event that reflects the change of the pod state.
|
// PodLifecycleEvent is an event that reflects the change of the pod state.
|
||||||
|
@ -83,10 +83,9 @@ type Manager interface {
|
|||||||
// triggers a status update.
|
// triggers a status update.
|
||||||
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
|
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
|
||||||
|
|
||||||
// TerminatePods resets the container status for the provided pods to terminated and triggers
|
// TerminatePod resets the container status for the provided pod to terminated and triggers
|
||||||
// a status update. This function may not enqueue all the provided pods, in which case it will
|
// a status update.
|
||||||
// return false
|
TerminatePod(pod *api.Pod)
|
||||||
TerminatePods(pods []*api.Pod) bool
|
|
||||||
|
|
||||||
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
|
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
|
||||||
// the provided podUIDs.
|
// the provided podUIDs.
|
||||||
@ -149,7 +148,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.updateStatusInternal(pod, status)
|
m.updateStatusInternal(pod, status, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
|
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
|
||||||
@ -212,31 +211,32 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
|||||||
status.Conditions = append(status.Conditions, readyCondition)
|
status.Conditions = append(status.Conditions, readyCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.updateStatusInternal(pod, status)
|
m.updateStatusInternal(pod, status, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
func (m *manager) TerminatePod(pod *api.Pod) {
|
||||||
allSent := true
|
|
||||||
m.podStatusesLock.Lock()
|
m.podStatusesLock.Lock()
|
||||||
defer m.podStatusesLock.Unlock()
|
defer m.podStatusesLock.Unlock()
|
||||||
for _, pod := range pods {
|
oldStatus := &pod.Status
|
||||||
for i := range pod.Status.ContainerStatuses {
|
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
|
||||||
pod.Status.ContainerStatuses[i].State = api.ContainerState{
|
oldStatus = &cachedStatus.status
|
||||||
Terminated: &api.ContainerStateTerminated{},
|
}
|
||||||
}
|
status, err := copyStatus(oldStatus)
|
||||||
}
|
if err != nil {
|
||||||
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
|
return
|
||||||
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
|
}
|
||||||
allSent = false
|
for i := range status.ContainerStatuses {
|
||||||
|
status.ContainerStatuses[i].State = api.ContainerState{
|
||||||
|
Terminated: &api.ContainerStateTerminated{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return allSent
|
m.updateStatusInternal(pod, pod.Status, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
||||||
// necessary. Returns whether an update was triggered.
|
// necessary. Returns whether an update was triggered.
|
||||||
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
||||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
|
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
|
||||||
var oldStatus api.PodStatus
|
var oldStatus api.PodStatus
|
||||||
cachedStatus, isCached := m.podStatuses[pod.UID]
|
cachedStatus, isCached := m.podStatuses[pod.UID]
|
||||||
if isCached {
|
if isCached {
|
||||||
@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
|
|||||||
normalizeStatus(&status)
|
normalizeStatus(&status)
|
||||||
// The intent here is to prevent concurrent updates to a pod's status from
|
// The intent here is to prevent concurrent updates to a pod's status from
|
||||||
// clobbering each other so the phase of a pod progresses monotonically.
|
// clobbering each other so the phase of a pod progresses monotonically.
|
||||||
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
|
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
|
||||||
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
||||||
return false // No new status.
|
return false // No new status.
|
||||||
}
|
}
|
||||||
@ -289,6 +289,8 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool
|
|||||||
default:
|
default:
|
||||||
// Let the periodic syncBatch handle the update if the channel is full.
|
// Let the periodic syncBatch handle the update if the channel is full.
|
||||||
// We can't block, since we hold the mutex lock.
|
// We can't block, since we hold the mutex lock.
|
||||||
|
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
|
||||||
|
format.Pod(pod), status)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user