kubelet: Handle UID reuse in pod worker

If a pod is killed (no longer wanted) and then a subsequent create/
add/update event is seen in the pod worker, assume that a pod UID
was reused (as it could be in static pods) and have the next
SyncKnownPods after the pod terminates remove the worker history so
that the config loop can restart the static pod, as well as return
to the caller the fact that this termination was not final.

The housekeeping loop then reconciles the desired state of the Kubelet
(pods in pod manager that are not in a terminal state, i.e. admitted
pods) with the pod worker by resubmitting those pods. This adds a
small amount of latency (2s) when a pod UID is reused and the pod
is terminated and restarted.
This commit is contained in:
Clayton Coleman 2021-09-08 14:38:09 -04:00
parent 47086a6623
commit d5719800bf
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
4 changed files with 118 additions and 24 deletions

View File

@ -2227,6 +2227,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod) kl.probeManager.AddPod(pod)
} }
} }
@ -2261,6 +2263,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
if err := kl.deletePod(pod); err != nil { if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
} }
// TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing
// once the pod kill is acknowledged and during eviction)
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.RemovePod(pod) kl.probeManager.RemovePod(pod)
} }
} }

View File

@ -983,15 +983,7 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
} }
// terminal pods are considered inactive UNLESS they are actively terminating // terminal pods are considered inactive UNLESS they are actively terminating
isTerminal := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
if !isTerminal {
// a pod that has been marked terminal within the Kubelet is considered
// inactive (may have been rejected by Kubelet admision)
if status, ok := kl.statusManager.GetPodStatus(p.UID); ok {
isTerminal = status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed
}
}
if isTerminal && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
continue continue
} }
@ -1000,6 +992,28 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
return filteredPods return filteredPods
} }
// isAdmittedPodTerminal returns true if the provided config source pod is in
// a terminal phase, or if the Kubelet has already indicated the pod has reached
// a terminal phase but the config source has not accepted it yet. This method
// should only be used within the pod configuration loops that notify the pod
// worker, other components should treat the pod worker as authoritative.
func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {
// pods are considered inactive if the config source has observed a
// terminal phase (if the Kubelet recorded that the pod reached a terminal
// phase the pod should never be restarted)
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return true
}
// a pod that has been marked terminal within the Kubelet is considered
// inactive (may have been rejected by Kubelet admision)
if status, ok := kl.statusManager.GetPodStatus(pod.UID); ok {
if status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed {
return true
}
}
return false
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where // removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node. // the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) { func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
@ -1081,13 +1095,16 @@ func (kl *Kubelet) HandlePodCleanups() error {
// cleanup of pod cgroups. // cleanup of pod cgroups.
runningPods := make(map[types.UID]sets.Empty) runningPods := make(map[types.UID]sets.Empty)
possiblyRunningPods := make(map[types.UID]sets.Empty) possiblyRunningPods := make(map[types.UID]sets.Empty)
restartablePods := make(map[types.UID]sets.Empty)
for uid, sync := range workingPods { for uid, sync := range workingPods {
switch sync { switch sync {
case SyncPodWork: case SyncPod:
runningPods[uid] = struct{}{} runningPods[uid] = struct{}{}
possiblyRunningPods[uid] = struct{}{} possiblyRunningPods[uid] = struct{}{}
case TerminatingPodWork: case TerminatingPod:
possiblyRunningPods[uid] = struct{}{} possiblyRunningPods[uid] = struct{}{}
case TerminatedAndRecreatedPod:
restartablePods[uid] = struct{}{}
} }
} }
@ -1103,8 +1120,8 @@ func (kl *Kubelet) HandlePodCleanups() error {
return err return err
} }
for _, runningPod := range runningRuntimePods { for _, runningPod := range runningRuntimePods {
switch workType, ok := workingPods[runningPod.ID]; { switch workerState, ok := workingPods[runningPod.ID]; {
case ok && workType == SyncPodWork, ok && workType == TerminatingPodWork: case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
// if the pod worker is already in charge of this pod, we don't need to do anything // if the pod worker is already in charge of this pod, we don't need to do anything
continue continue
default: default:
@ -1171,6 +1188,32 @@ func (kl *Kubelet) HandlePodCleanups() error {
} }
kl.backOff.GC() kl.backOff.GC()
// If two pods with the same UID are observed in rapid succession, we need to
// resynchronize the pod worker after the first pod completes and decide whether
// to restart the pod. This happens last to avoid confusing the desired state
// in other components and to increase the likelihood transient OS failures during
// container start are mitigated. In general only static pods will ever reuse UIDs
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
// probability of collision.
for uid := range restartablePods {
pod, ok := allPodsByUID[uid]
if !ok {
continue
}
if kl.isAdmittedPodTerminal(pod) {
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
start := kl.clock.Now()
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
}
return nil return nil
} }

View File

@ -91,7 +91,7 @@ type UpdatePodOptions struct {
type PodWorkType int type PodWorkType int
const ( const (
// SyncPodSync is when the pod is expected to be started and running. // SyncPodWork is when the pod is expected to be started and running.
SyncPodWork PodWorkType = iota SyncPodWork PodWorkType = iota
// TerminatingPodWork is when the pod is no longer being set up, but some // TerminatingPodWork is when the pod is no longer being set up, but some
// containers may be running and are being torn down. // containers may be running and are being torn down.
@ -101,6 +101,26 @@ const (
TerminatedPodWork TerminatedPodWork
) )
// PodWorkType classifies the status of pod as seen by the pod worker - setup (sync),
// teardown of containers (terminating), cleanup (terminated), or recreated with the
// same UID (kill -> create while terminating)
type PodWorkerState int
const (
// SyncPod is when the pod is expected to be started and running.
SyncPod PodWorkerState = iota
// TerminatingPod is when the pod is no longer being set up, but some
// containers may be running and are being torn down.
TerminatingPod
// TerminatedPod indicates the pod is stopped, can have no more running
// containers, and any foreground cleanup can be executed.
TerminatedPod
// TerminatedAndRecreatedPod indicates that after the pod was terminating a
// request to recreate the pod was received. The pod is terminated and can
// now be restarted by sending a create event to the pod worker.
TerminatedAndRecreatedPod
)
// podWork is the internal changes // podWork is the internal changes
type podWork struct { type podWork struct {
// WorkType is the type of sync to perform - sync (create), terminating (stop // WorkType is the type of sync to perform - sync (create), terminating (stop
@ -127,8 +147,8 @@ type PodWorkers interface {
// and have been terminated for a significant period of time. Once this method // and have been terminated for a significant period of time. Once this method
// has been called once, the workers are assumed to be fully initialized and // has been called once, the workers are assumed to be fully initialized and
// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return // subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
// true. // true. It returns a map describing the state of each known pod worker.
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState
// IsPodKnownTerminated returns true if the provided pod UID is known by the pod // IsPodKnownTerminated returns true if the provided pod UID is known by the pod
// worker to be terminated. If the pod has been force deleted and the pod worker // worker to be terminated. If the pod has been force deleted and the pod worker
@ -254,6 +274,11 @@ type podSyncStatus struct {
// to remove the pod. A terminal pod (Succeeded/Failed) will have // to remove the pod. A terminal pod (Succeeded/Failed) will have
// termination status until the pod is deleted. // termination status until the pod is deleted.
finished bool finished bool
// restartRequested is true if the pod worker was informed the pod is
// expected to exist (update type of create, update, or sync) after
// it has been killed. When known pods are synced, any pod that is
// terminated and has restartRequested will have its history cleared.
restartRequested bool
// notifyPostTerminating will be closed once the pod transitions to // notifyPostTerminating will be closed once the pod transitions to
// terminated. After the pod is in terminated state, nothing should be // terminated. After the pod is in terminated state, nothing should be
// added to this list. // added to this list.
@ -514,6 +539,19 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
p.podSyncStatuses[uid] = status p.podSyncStatuses[uid] = status
} }
// if an update is received that implies the pod should be running, but we are already terminating a pod by
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
// to start the pod worker again
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping) // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() { if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
@ -965,8 +1003,8 @@ func (p *podWorkers) contextForWorker(uid types.UID) context.Context {
// to UpdatePods for new pods. It returns a map of known workers that are not finished // to UpdatePods for new pods. It returns a map of known workers that are not finished
// with a value of SyncPodTerminated, SyncPodKill, or SyncPodSync depending on whether // with a value of SyncPodTerminated, SyncPodKill, or SyncPodSync depending on whether
// the pod is terminated, terminating, or syncing. // the pod is terminated, terminating, or syncing.
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType { func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
workers := make(map[types.UID]PodWorkType) workers := make(map[types.UID]PodWorkerState)
known := make(map[types.UID]struct{}) known := make(map[types.UID]struct{})
for _, pod := range desiredPods { for _, pod := range desiredPods {
known[pod.UID] = struct{}{} known[pod.UID] = struct{}{}
@ -977,16 +1015,20 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkT
p.podsSynced = true p.podsSynced = true
for uid, status := range p.podSyncStatuses { for uid, status := range p.podSyncStatuses {
if _, exists := known[uid]; !exists { if _, exists := known[uid]; !exists || status.restartRequested {
p.removeTerminatedWorker(uid) p.removeTerminatedWorker(uid)
} }
switch { switch {
case !status.terminatedAt.IsZero(): case !status.terminatedAt.IsZero():
workers[uid] = TerminatedPodWork if status.restartRequested {
workers[uid] = TerminatedAndRecreatedPod
} else {
workers[uid] = TerminatedPod
}
case !status.terminatingAt.IsZero(): case !status.terminatingAt.IsZero():
workers[uid] = TerminatingPodWork workers[uid] = TerminatingPod
default: default:
workers[uid] = SyncPodWork workers[uid] = SyncPod
} }
} }
return workers return workers
@ -1009,7 +1051,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return return
} }
if status.restartRequested {
klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
} else {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid) delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid) delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid) delete(p.lastUndeliveredWorkUpdate, uid)

View File

@ -82,7 +82,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
} }
} }
func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType { func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
return nil return nil
} }