mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Merge pull request #104847 from smarterclayton/worker_uid_reuse
kubelet: Handle UID reuse in pod worker
This commit is contained in:
commit
51384aa77e
@ -2227,6 +2227,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
|
||||
// TODO: move inside syncPod and make reentrant
|
||||
// https://github.com/kubernetes/kubernetes/issues/105014
|
||||
kl.probeManager.AddPod(pod)
|
||||
}
|
||||
}
|
||||
@ -2261,6 +2263,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
|
||||
if err := kl.deletePod(pod); err != nil {
|
||||
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
|
||||
}
|
||||
// TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing
|
||||
// once the pod kill is acknowledged and during eviction)
|
||||
// https://github.com/kubernetes/kubernetes/issues/105014
|
||||
kl.probeManager.RemovePod(pod)
|
||||
}
|
||||
}
|
||||
|
@ -983,15 +983,7 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
|
||||
}
|
||||
|
||||
// terminal pods are considered inactive UNLESS they are actively terminating
|
||||
isTerminal := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed
|
||||
if !isTerminal {
|
||||
// a pod that has been marked terminal within the Kubelet is considered
|
||||
// inactive (may have been rejected by Kubelet admision)
|
||||
if status, ok := kl.statusManager.GetPodStatus(p.UID); ok {
|
||||
isTerminal = status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed
|
||||
}
|
||||
}
|
||||
if isTerminal && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
|
||||
if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1000,6 +992,28 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
|
||||
return filteredPods
|
||||
}
|
||||
|
||||
// isAdmittedPodTerminal returns true if the provided config source pod is in
|
||||
// a terminal phase, or if the Kubelet has already indicated the pod has reached
|
||||
// a terminal phase but the config source has not accepted it yet. This method
|
||||
// should only be used within the pod configuration loops that notify the pod
|
||||
// worker, other components should treat the pod worker as authoritative.
|
||||
func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {
|
||||
// pods are considered inactive if the config source has observed a
|
||||
// terminal phase (if the Kubelet recorded that the pod reached a terminal
|
||||
// phase the pod should never be restarted)
|
||||
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||
return true
|
||||
}
|
||||
// a pod that has been marked terminal within the Kubelet is considered
|
||||
// inactive (may have been rejected by Kubelet admision)
|
||||
if status, ok := kl.statusManager.GetPodStatus(pod.UID); ok {
|
||||
if status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
|
||||
// the pod is no longer considered bound to this node.
|
||||
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
|
||||
@ -1081,13 +1095,16 @@ func (kl *Kubelet) HandlePodCleanups() error {
|
||||
// cleanup of pod cgroups.
|
||||
runningPods := make(map[types.UID]sets.Empty)
|
||||
possiblyRunningPods := make(map[types.UID]sets.Empty)
|
||||
restartablePods := make(map[types.UID]sets.Empty)
|
||||
for uid, sync := range workingPods {
|
||||
switch sync {
|
||||
case SyncPodWork:
|
||||
case SyncPod:
|
||||
runningPods[uid] = struct{}{}
|
||||
possiblyRunningPods[uid] = struct{}{}
|
||||
case TerminatingPodWork:
|
||||
case TerminatingPod:
|
||||
possiblyRunningPods[uid] = struct{}{}
|
||||
case TerminatedAndRecreatedPod:
|
||||
restartablePods[uid] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1103,8 +1120,8 @@ func (kl *Kubelet) HandlePodCleanups() error {
|
||||
return err
|
||||
}
|
||||
for _, runningPod := range runningRuntimePods {
|
||||
switch workType, ok := workingPods[runningPod.ID]; {
|
||||
case ok && workType == SyncPodWork, ok && workType == TerminatingPodWork:
|
||||
switch workerState, ok := workingPods[runningPod.ID]; {
|
||||
case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
|
||||
// if the pod worker is already in charge of this pod, we don't need to do anything
|
||||
continue
|
||||
default:
|
||||
@ -1171,6 +1188,32 @@ func (kl *Kubelet) HandlePodCleanups() error {
|
||||
}
|
||||
|
||||
kl.backOff.GC()
|
||||
|
||||
// If two pods with the same UID are observed in rapid succession, we need to
|
||||
// resynchronize the pod worker after the first pod completes and decide whether
|
||||
// to restart the pod. This happens last to avoid confusing the desired state
|
||||
// in other components and to increase the likelihood transient OS failures during
|
||||
// container start are mitigated. In general only static pods will ever reuse UIDs
|
||||
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
|
||||
// probability of collision.
|
||||
for uid := range restartablePods {
|
||||
pod, ok := allPodsByUID[uid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if kl.isAdmittedPodTerminal(pod) {
|
||||
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
continue
|
||||
}
|
||||
start := kl.clock.Now()
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
|
||||
// TODO: move inside syncPod and make reentrant
|
||||
// https://github.com/kubernetes/kubernetes/issues/105014
|
||||
kl.probeManager.AddPod(pod)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ type UpdatePodOptions struct {
|
||||
type PodWorkType int
|
||||
|
||||
const (
|
||||
// SyncPodSync is when the pod is expected to be started and running.
|
||||
// SyncPodWork is when the pod is expected to be started and running.
|
||||
SyncPodWork PodWorkType = iota
|
||||
// TerminatingPodWork is when the pod is no longer being set up, but some
|
||||
// containers may be running and are being torn down.
|
||||
@ -101,6 +101,26 @@ const (
|
||||
TerminatedPodWork
|
||||
)
|
||||
|
||||
// PodWorkType classifies the status of pod as seen by the pod worker - setup (sync),
|
||||
// teardown of containers (terminating), cleanup (terminated), or recreated with the
|
||||
// same UID (kill -> create while terminating)
|
||||
type PodWorkerState int
|
||||
|
||||
const (
|
||||
// SyncPod is when the pod is expected to be started and running.
|
||||
SyncPod PodWorkerState = iota
|
||||
// TerminatingPod is when the pod is no longer being set up, but some
|
||||
// containers may be running and are being torn down.
|
||||
TerminatingPod
|
||||
// TerminatedPod indicates the pod is stopped, can have no more running
|
||||
// containers, and any foreground cleanup can be executed.
|
||||
TerminatedPod
|
||||
// TerminatedAndRecreatedPod indicates that after the pod was terminating a
|
||||
// request to recreate the pod was received. The pod is terminated and can
|
||||
// now be restarted by sending a create event to the pod worker.
|
||||
TerminatedAndRecreatedPod
|
||||
)
|
||||
|
||||
// podWork is the internal changes
|
||||
type podWork struct {
|
||||
// WorkType is the type of sync to perform - sync (create), terminating (stop
|
||||
@ -127,8 +147,8 @@ type PodWorkers interface {
|
||||
// and have been terminated for a significant period of time. Once this method
|
||||
// has been called once, the workers are assumed to be fully initialized and
|
||||
// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
|
||||
// true.
|
||||
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType
|
||||
// true. It returns a map describing the state of each known pod worker.
|
||||
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState
|
||||
|
||||
// IsPodKnownTerminated returns true if the provided pod UID is known by the pod
|
||||
// worker to be terminated. If the pod has been force deleted and the pod worker
|
||||
@ -254,6 +274,11 @@ type podSyncStatus struct {
|
||||
// to remove the pod. A terminal pod (Succeeded/Failed) will have
|
||||
// termination status until the pod is deleted.
|
||||
finished bool
|
||||
// restartRequested is true if the pod worker was informed the pod is
|
||||
// expected to exist (update type of create, update, or sync) after
|
||||
// it has been killed. When known pods are synced, any pod that is
|
||||
// terminated and has restartRequested will have its history cleared.
|
||||
restartRequested bool
|
||||
// notifyPostTerminating will be closed once the pod transitions to
|
||||
// terminated. After the pod is in terminated state, nothing should be
|
||||
// added to this list.
|
||||
@ -514,6 +539,19 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
|
||||
p.podSyncStatuses[uid] = status
|
||||
}
|
||||
|
||||
// if an update is received that implies the pod should be running, but we are already terminating a pod by
|
||||
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
|
||||
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
|
||||
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
|
||||
// to start the pod worker again
|
||||
if status.IsTerminationRequested() {
|
||||
if options.UpdateType == kubetypes.SyncPodCreate {
|
||||
status.restartRequested = true
|
||||
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
|
||||
if status.IsFinished() {
|
||||
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
@ -965,8 +1003,8 @@ func (p *podWorkers) contextForWorker(uid types.UID) context.Context {
|
||||
// to UpdatePods for new pods. It returns a map of known workers that are not finished
|
||||
// with a value of SyncPodTerminated, SyncPodKill, or SyncPodSync depending on whether
|
||||
// the pod is terminated, terminating, or syncing.
|
||||
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType {
|
||||
workers := make(map[types.UID]PodWorkType)
|
||||
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
|
||||
workers := make(map[types.UID]PodWorkerState)
|
||||
known := make(map[types.UID]struct{})
|
||||
for _, pod := range desiredPods {
|
||||
known[pod.UID] = struct{}{}
|
||||
@ -977,16 +1015,20 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkT
|
||||
|
||||
p.podsSynced = true
|
||||
for uid, status := range p.podSyncStatuses {
|
||||
if _, exists := known[uid]; !exists {
|
||||
if _, exists := known[uid]; !exists || status.restartRequested {
|
||||
p.removeTerminatedWorker(uid)
|
||||
}
|
||||
switch {
|
||||
case !status.terminatedAt.IsZero():
|
||||
workers[uid] = TerminatedPodWork
|
||||
if status.restartRequested {
|
||||
workers[uid] = TerminatedAndRecreatedPod
|
||||
} else {
|
||||
workers[uid] = TerminatedPod
|
||||
}
|
||||
case !status.terminatingAt.IsZero():
|
||||
workers[uid] = TerminatingPodWork
|
||||
workers[uid] = TerminatingPod
|
||||
default:
|
||||
workers[uid] = SyncPodWork
|
||||
workers[uid] = SyncPod
|
||||
}
|
||||
}
|
||||
return workers
|
||||
@ -1009,7 +1051,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
|
||||
if status.restartRequested {
|
||||
klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
|
||||
}
|
||||
delete(p.podSyncStatuses, uid)
|
||||
delete(p.podUpdates, uid)
|
||||
delete(p.lastUndeliveredWorkUpdate, uid)
|
||||
|
@ -82,7 +82,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType {
|
||||
func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
@ -133,6 +134,60 @@ var _ = SIGDescribe("MirrorPod", func() {
|
||||
err := deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("wait for the mirror pod to disappear")
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
})
|
||||
})
|
||||
ginkgo.Context("when create a mirror pod without changes ", func() {
|
||||
var ns, podPath, staticPodName, mirrorPodName string
|
||||
ginkgo.BeforeEach(func() {
|
||||
})
|
||||
/*
|
||||
Release: v1.23
|
||||
Testname: Mirror Pod, recreate
|
||||
Description: When a static pod's manifest is removed and readded, the mirror pod MUST successfully recreate. Create the static pod, verify it is running, remove its manifest and then add it back, and verify the static pod runs again.
|
||||
*/
|
||||
ginkgo.It("should successfully recreate when file is removed and recreated [NodeConformance]", func() {
|
||||
ns = f.Namespace.Name
|
||||
staticPodName = "static-pod-" + string(uuid.NewUUID())
|
||||
mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName
|
||||
|
||||
podPath = framework.TestContext.KubeletConfig.StaticPodPath
|
||||
ginkgo.By("create the static pod")
|
||||
err := createStaticPod(podPath, staticPodName, ns,
|
||||
imageutils.GetE2EImage(imageutils.Nginx), v1.RestartPolicyAlways)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("wait for the mirror pod to be running")
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns)
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
|
||||
ginkgo.By("delete the pod manifest from disk")
|
||||
err = deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("recreate the file")
|
||||
err = createStaticPod(podPath, staticPodName, ns,
|
||||
imageutils.GetE2EImage(imageutils.Nginx), v1.RestartPolicyAlways)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("mirror pod should restart with count 1")
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodRunningWithRestartCount(2*time.Second, 2*time.Minute, f.ClientSet, mirrorPodName, ns, 1)
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
|
||||
ginkgo.By("mirror pod should stay running")
|
||||
gomega.Consistently(func() error {
|
||||
return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns)
|
||||
}, time.Second*30, time.Second*4).Should(gomega.BeNil())
|
||||
|
||||
ginkgo.By("delete the static pod")
|
||||
err = deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("wait for the mirror pod to disappear")
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
|
||||
@ -194,12 +249,44 @@ func checkMirrorPodRunning(cl clientset.Interface, name, namespace string) error
|
||||
}
|
||||
for i := range pod.Status.ContainerStatuses {
|
||||
if pod.Status.ContainerStatuses[i].State.Running == nil {
|
||||
return fmt.Errorf("expected the mirror pod %q with container %q to be running", name, pod.Status.ContainerStatuses[i].Name)
|
||||
return fmt.Errorf("expected the mirror pod %q with container %q to be running (got containers=%v)", name, pod.Status.ContainerStatuses[i].Name, pod.Status.ContainerStatuses[i].State)
|
||||
}
|
||||
}
|
||||
return validateMirrorPod(cl, pod)
|
||||
}
|
||||
|
||||
func checkMirrorPodRunningWithRestartCount(interval time.Duration, timeout time.Duration, cl clientset.Interface, name, namespace string, count int32) error {
|
||||
var pod *v1.Pod
|
||||
var err error
|
||||
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
pod, err = cl.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("expected the mirror pod %q to appear: %v", name, err)
|
||||
}
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
return false, fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase)
|
||||
}
|
||||
for i := range pod.Status.ContainerStatuses {
|
||||
if pod.Status.ContainerStatuses[i].State.Waiting != nil {
|
||||
// retry if pod is in waiting state
|
||||
return false, nil
|
||||
}
|
||||
if pod.Status.ContainerStatuses[i].State.Running == nil {
|
||||
return false, fmt.Errorf("expected the mirror pod %q with container %q to be running (got containers=%v)", name, pod.Status.ContainerStatuses[i].Name, pod.Status.ContainerStatuses[i].State)
|
||||
}
|
||||
if pod.Status.ContainerStatuses[i].RestartCount == count {
|
||||
// found the restart count
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return validateMirrorPod(cl, pod)
|
||||
}
|
||||
|
||||
func checkMirrorPodRecreatedAndRunning(cl clientset.Interface, name, namespace string, oUID types.UID) error {
|
||||
pod, err := cl.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user