From 69a3820214e0fb7bbdf6574ff2fbc86b14ca4d53 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 26 Feb 2022 14:36:51 -0500 Subject: [PATCH 1/4] kubelet: Delay writing a terminal phase until the pod is terminated Other components must know when the Kubelet has released critical resources for terminal pods. Do not set the phase in the apiserver to terminal until all containers are stopped and cannot restart. As a consequence of this change, the Kubelet must explicitly transition a terminal pod to the terminating state in the pod worker which is handled by returning a new isTerminal boolean from syncPod. Finally, if a pod with init containers hasn't been initialized yet, don't default container statuses or not yet attempted init containers to the unknown failure state. --- pkg/kubelet/kubelet.go | 60 ++- pkg/kubelet/kubelet_pods.go | 8 +- pkg/kubelet/kubelet_test.go | 28 +- pkg/kubelet/pod_workers.go | 48 +- pkg/kubelet/pod_workers_test.go | 102 ++++- pkg/kubelet/runonce.go | 6 +- pkg/kubelet/status/status_manager.go | 117 ++++- pkg/kubelet/status/status_manager_test.go | 421 +++++++++++++++++- .../testing/fake_pod_deletion_safety.go | 19 +- .../testing/mock_pod_status_provider.go | 14 + 10 files changed, 734 insertions(+), 89 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2b96020d722..ab255e5216f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1452,24 +1452,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // syncPod is the transaction script for the sync of a single pod (setting up) -// a pod. The reverse (teardown) is handled in syncTerminatingPod and -// syncTerminatedPod. If syncPod exits without error, then the pod runtime -// state is in sync with the desired configuration state (pod is running). -// If syncPod exits with a transient error, the next invocation of syncPod -// is expected to make progress towards reaching the runtime state. +// a pod. This method is reentrant and expected to converge a pod towards the +// desired state of the spec. The reverse (teardown) is handled in +// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error, +// then the pod runtime state is in sync with the desired configuration state +// (pod is running). If syncPod exits with a transient error, the next +// invocation of syncPod is expected to make progress towards reaching the +// runtime state. syncPod exits with isTerminal when the pod was detected to +// have reached a terminal lifecycle phase due to container exits (for +// RestartNever or RestartOnFailure) and the next method invoked will by +// syncTerminatingPod. // // Arguments: // -// o - the SyncPodOptions for this invocation +// updateType - whether this is a create (first time) or an update, should +// only be used for metrics since this method must be reentrant +// pod - the pod that is being set up +// mirrorPod - the mirror pod known to the kubelet for this pod, if any +// podStatus - the most recent pod status observed for this pod which can +// be used to determine the set of actions that should be taken during +// this loop of syncPod // // The workflow is: -// * Kill the pod immediately if update type is SyncPodKill // * If the pod is being created, record pod worker start latency // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod // * If the pod is being seen as running for the first time, record pod // start latency // * Update the status of the pod in the status manager -// * Kill the pod if it should not be running due to soft admission +// * Stop the pod's containers if it should not be running due to soft +// admission +// * Ensure any background tracking for a runnable pod is started // * Create a mirror pod if the pod is a static pod, and does not // already have a mirror pod // * Create the data directories for the pod if they do not exist @@ -1483,10 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // // This operation writes all events that are dispatched in order to provide // the most accurate information possible about an error situation to aid debugging. -// Callers should not throw an event if this operation returns an error. -func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +// Callers should not write an event if this operation returns an error. +func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) - defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + defer func() { + klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal) + }() // Latency measurements for the main workflow are relative to the // first time the pod was seen by kubelet. @@ -1518,11 +1532,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType for _, ipInfo := range apiPodStatus.PodIPs { podStatus.IPs = append(podStatus.IPs, ipInfo.IP) } - if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { podStatus.IPs = []string{apiPodStatus.PodIP} } + // If the pod is terminal, we don't need to continue to setup the pod + if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed { + kl.statusManager.SetPodStatus(pod, apiPodStatus) + isTerminal = true + return isTerminal, nil + } + // If the pod should not be running, we request the pod's containers be stopped. This is not the same // as termination (we want to stop the pod, but potentially restart it later if soft admission allows // it later). Set the status and phase appropriately @@ -1572,13 +1592,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } - return syncErr + return false, syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) - return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) + return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) } // ensure the kubelet knows about referenced secrets or configmaps used by the pod @@ -1635,7 +1655,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) - return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) + return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } @@ -1676,7 +1696,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.makePodDataDirs(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) - return err + return false, err } // Volume manager will not mount volumes for terminating pods @@ -1686,7 +1706,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) - return err + return false, err } } @@ -1702,14 +1722,14 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime, so we get better errors. - return err + return false, err } } - return nil + return false, nil } - return nil + return false, nil } // syncTerminatingPod is expected to terminate all running containers in a pod. Once this method diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 2808ad02b2c..432db4add9f 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -915,6 +915,12 @@ func countRunningContainerStatus(status v1.PodStatus) int { return runningContainers } +// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running +// containers. This returns false if the pod has not yet been started or the pod is unknown. +func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return kl.podWorkers.CouldHaveRunningContainers(pod.UID) +} + // PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have // been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { @@ -1424,7 +1430,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { } // generateAPIPodStatus creates the final API pod status for a pod, given the -// internal pod status. +// internal pod status. This method should only be called from within sync*Pod methods. func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index d7fe88c446d..e8c2d06f1df 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -505,9 +505,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -584,9 +584,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -1300,8 +1300,11 @@ func TestCreateMirrorPod(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pods := []*v1.Pod{pod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } podFullName := kubecontainer.GetPodFullName(pod) assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName) assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods()) @@ -1332,8 +1335,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*v1.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } name := kubecontainer.GetPodFullName(pod) creates, deletes := manager.GetCounts(name) if creates != 1 || deletes != 1 { @@ -1489,13 +1495,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { }) kubelet.podManager.SetPods([]*v1.Pod{pod}) - err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource pod.Spec.HostNetwork = true - err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } } func TestFilterOutInactivePods(t *testing.T) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index d8dfaa4034f..5632745e060 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -218,7 +218,7 @@ type PodWorkers interface { } // the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod) -type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error +type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) // the function to invoke to terminate a pod (ensure no running processes are present) type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error @@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + var isTerminal bool err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate // status updates on resyncs (the result of the last sync), transitions to @@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn) default: - err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) + isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) } lastSyncTime = time.Now() return err }() + var phaseTransition bool switch { case err == context.Canceled: // when the context is cancelled we expect an update to already be queued @@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } // otherwise we move to the terminating phase p.completeTerminating(pod) + phaseTransition = true + + case isTerminal: + // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating + klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + p.completeSync(pod) + phaseTransition = true } - // queue a retry for errors if necessary, then put the next event in the channel if any - p.completeWork(pod, err) + // queue a retry if necessary, then put the next event in the channel if any + p.completeWork(pod, phaseTransition, err) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } @@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc { return nil } +// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should +// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways +// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by +// UpdatePod. +func (p *podWorkers) completeSync(pod *v1.Pod) { + p.podLock.Lock() + defer p.podLock.Unlock() + + klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID) + + if status, ok := p.podSyncStatuses[pod.UID]; ok { + if status.terminatingAt.IsZero() { + status.terminatingAt = time.Now() + } else { + klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + status.startedTerminating = true + } + + p.lastUndeliveredWorkUpdate[pod.UID] = podWork{ + WorkType: TerminatingPodWork, + Options: UpdatePodOptions{ + Pod: pod, + }, + } +} + // completeTerminating is invoked when syncTerminatingPod completes successfully, which means // no container is running, no container will be started in the future, and we are ready for // cleanup. This updates the termination state which prevents future syncs and will ensure @@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) { // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. -func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) { +func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) { // Requeue the last update if the last sync returned error. switch { + case phaseTransition: + p.workQueue.Enqueue(pod.UID, 0) case syncErr == nil: // No error; requeue at the regular resync interval. p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index afde2b0d789..c47947fa579 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -18,7 +18,6 @@ package kubelet import ( "context" - "flag" "reflect" "strconv" "sync" @@ -31,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -48,6 +46,7 @@ type fakePodWorkers struct { t TestingInterface triggeredDeletion []types.UID + triggeredTerminal []types.UID statusLock sync.Mutex running map[types.UID]bool @@ -79,9 +78,13 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { case kubetypes.SyncPodKill: f.triggeredDeletion = append(f.triggeredDeletion, uid) default: - if err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status); err != nil { + isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status) + if err != nil { f.t.Errorf("Unexpected error: %v", err) } + if isTerminal { + f.triggeredTerminal = append(f.triggeredTerminal, uid) + } } } @@ -249,7 +252,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { fakeCache := containertest.NewFakeCache(fakeRuntime) fakeQueue := &fakeQueue{} w := newPodWorkers( - func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { func() { lock.Lock() defer lock.Unlock() @@ -259,7 +262,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { updateType: updateType, }) }() - return nil + return false, nil }, func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { func() { @@ -530,9 +533,84 @@ func newUIDSet(uids ...types.UID) sets.String { return set } -func init() { - klog.InitFlags(nil) - flag.Lookup("v").Value.Set("5") +type terminalPhaseSync struct { + lock sync.Mutex + fn syncPodFnType + terminal sets.String +} + +func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { + isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus) + if err != nil { + return false, err + } + if !isTerminal { + s.lock.Lock() + defer s.lock.Unlock() + isTerminal = s.terminal.Has(string(pod.UID)) + } + return isTerminal, nil +} + +func (s *terminalPhaseSync) SetTerminal(uid types.UID) { + s.lock.Lock() + defer s.lock.Unlock() + s.terminal.Insert(string(uid)) +} + +func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync { + return &terminalPhaseSync{ + fn: fn, + terminal: sets.NewString(), + } +} + +func TestTerminalPhaseTransition(t *testing.T) { + podWorkers, _ := createPodWorkers() + var channels WorkChannel + podWorkers.workerChannelFn = channels.Intercept + terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.syncPodFn) + podWorkers.syncPodFn = terminalPhaseSyncer.SyncPod + + // start pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod running + pod1 := podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // send another update to the pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod still running + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // the next sync should result in a transition to terminal + terminalPhaseSyncer.SetTerminal(types.UID("1")) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod terminating + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if !pod1.IsTerminationRequested() || !pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } } func TestStaticPodExclusion(t *testing.T) { @@ -1203,15 +1281,15 @@ type simpleFakeKubelet struct { wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus - return nil + return false, nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus kl.wg.Done() - return nil + return false, nil } func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 19b8a4f6a7b..00f3022af5a 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -112,9 +112,10 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results [] // runPod runs a single pod and wait until all containers are running. func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { + var isTerminal bool delay := retryDelay retry := 0 - for { + for !isTerminal { status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) if err != nil { return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) @@ -131,7 +132,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - if err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { + if isTerminal, err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) } if retry >= runOnceMaxRetries { @@ -143,6 +144,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { retry++ delay *= runOnceRetryDelayBackoff } + return nil } // isPodRunning returns true if all containers of a manifest are running. diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 84c8c6f92cf..8e956350882 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -83,8 +83,10 @@ type PodStatusProvider interface { // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. type PodDeletionSafetyProvider interface { - // A function which returns true if the pod can safely be deleted + // PodResourcesAreReclaimed returns true if the pod can safely be deleted. PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool + // PodCouldHaveRunningContainers returns true if the pod could have running containers. + PodCouldHaveRunningContainers(pod *v1.Pod) bool } // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with @@ -335,19 +337,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) { oldStatus = &cachedStatus.status } status := *oldStatus.DeepCopy() - for i := range status.ContainerStatuses { - if status.ContainerStatuses[i].State.Terminated != nil { - continue - } - status.ContainerStatuses[i].State = v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Reason: "ContainerStatusUnknown", - Message: "The container could not be located when the pod was terminated", - ExitCode: 137, - }, + + // once a pod has initialized, any missing status is treated as a failure + if hasPodInitialized(pod) { + for i := range status.ContainerStatuses { + if status.ContainerStatuses[i].State.Terminated != nil { + continue + } + status.ContainerStatuses[i].State = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Reason: "ContainerStatusUnknown", + Message: "The container could not be located when the pod was terminated", + ExitCode: 137, + }, + } } } - for i := range status.InitContainerStatuses { + + // all but the final suffix of init containers which have no evidence of a container start are + // marked as failed containers + for i := range initializedContainers(status.InitContainerStatuses) { if status.InitContainerStatuses[i].State.Terminated != nil { continue } @@ -364,6 +373,49 @@ func (m *manager) TerminatePod(pod *v1.Pod) { m.updateStatusInternal(pod, status, true) } +// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which +// implies those containers should not be transitioned to terminated status. +func hasPodInitialized(pod *v1.Pod) bool { + // a pod without init containers is always initialized + if len(pod.Spec.InitContainers) == 0 { + return true + } + // if any container has ever moved out of waiting state, the pod has initialized + for _, status := range pod.Status.ContainerStatuses { + if status.LastTerminationState.Terminated != nil || status.State.Waiting == nil { + return true + } + } + // if the last init container has ever completed with a zero exit code, the pod is initialized + if l := len(pod.Status.InitContainerStatuses); l > 0 { + container := pod.Status.InitContainerStatuses[l-1] + if state := container.LastTerminationState; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + if state := container.State; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + } + // otherwise the pod has no record of being initialized + return false +} + +// initializedContainers returns all status except for suffix of containers that are in Waiting +// state, which is the set of containers that have attempted to start at least once. If all containers +// are Watiing, the first container is always returned. +func initializedContainers(containers []v1.ContainerStatus) []v1.ContainerStatus { + for i := len(containers) - 1; i >= 0; i-- { + if containers[i].State.Waiting == nil || containers[i].LastTerminationState.Terminated != nil { + return containers[0 : i+1] + } + } + // always return at least one container + if len(containers) > 0 { + return containers[0:1] + } + return nil +} + // checkContainerStateTransition ensures that no container is trying to transition // from a terminated to non-terminated state, which is illegal and indicates a // logical error in the kubelet. @@ -619,8 +671,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { return } - oldStatus := pod.Status.DeepCopy() - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) + mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) + + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes)) if err != nil { @@ -630,7 +683,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if unchanged { klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) } else { - klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status) + klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) pod = newPod } @@ -771,25 +824,49 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { return status } -// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions -// not owned by kubelet is preserved from oldPodStatus -func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { - podConditions := []v1.PodCondition{} +// mergePodStatus merges oldPodStatus and newPodStatus to preserve where pod conditions +// not owned by kubelet and to ensure terminal phase transition only happens after all +// running containers have terminated. This method does not modify the old status. +func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus { + podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions)) + for _, c := range oldPodStatus.Conditions { if !kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } - for _, c := range newPodStatus.Conditions { if kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } newPodStatus.Conditions = podConditions + + // Delay transitioning a pod to a terminal status unless the pod is actually terminal. + // The Kubelet should never transition a pod to terminal status that could have running + // containers and thus actively be leveraging exclusive resources. Note that resources + // like volumes are reconciled by a subsystem in the Kubelet and will converge if a new + // pod reuses an exclusive resource (unmount -> free -> mount), which means we do not + // need wait for those resources to be detached by the Kubelet. In general, resources + // the Kubelet exclusively owns must be released prior to a pod being reported terminal, + // while resources that have participanting components above the API use the pod's + // transition to a terminal phase (or full deletion) to release those resources. + if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) { + if couldHaveRunningContainers { + newPodStatus.Phase = oldPodStatus.Phase + newPodStatus.Reason = oldPodStatus.Reason + newPodStatus.Message = oldPodStatus.Message + } + } + return newPodStatus } +// isPhaseTerminal returns true if the pod's phase is terminal. +func isPhaseTerminal(phase v1.PodPhase) bool { + return phase == v1.PodFailed || phase == v1.PodSucceeded +} + // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile func NeedToReconcilePodReadiness(pod *v1.Pod) bool { if len(pod.Spec.ReadinessGates) == 0 { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 855fac52ecd..9ecdf67b300 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -648,11 +649,14 @@ func TestTerminatePodWaiting(t *testing.T) { t.Logf("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) - for i := range newStatus.ContainerStatuses { - assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated") + for _, container := range newStatus.ContainerStatuses { + assert.False(t, container.State.Terminated == nil, "expected containers to be terminated") } - for i := range newStatus.InitContainerStatuses { - assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") + for _, container := range newStatus.InitContainerStatuses[:2] { + assert.False(t, container.State.Terminated == nil, "expected init containers to be terminated") + } + for _, container := range newStatus.InitContainerStatuses[2:] { + assert.False(t, container.State.Waiting == nil, "expected init containers to be waiting") } expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}} @@ -662,8 +666,8 @@ func TestTerminatePodWaiting(t *testing.T) { if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) } - if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, expectUnknownState) { - t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, expectUnknownState)) + if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { + t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState)) @@ -680,6 +684,308 @@ func TestTerminatePodWaiting(t *testing.T) { assert.Equal(t, newStatus.Message, firstStatus.Message) } +func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { + newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod { + pod := getTestPod() + for i := 0; i < initContainers; i++ { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ + Name: fmt.Sprintf("init-%d", i), + }) + } + for i := 0; i < containers; i++ { + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("%d", i), + }) + } + pod.Status.StartTime = &metav1.Time{Time: time.Unix(1, 0).UTC()} + for _, fn := range fns { + fn(pod) + } + return pod + } + expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != 137 || state.Terminated.Reason != "ContainerStatusUnknown" || len(state.Terminated.Message) == 0 { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != exitCode { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectWaiting := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated != nil || state.Running != nil || state.Waiting == nil { + t.Fatalf("unexpected state: %#v", state) + } + } + + testCases := []struct { + name string + pod *v1.Pod + updateFn func(*v1.Pod) + expectFn func(t *testing.T, status v1.PodStatus) + }{ + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, + } + })}, + { + name: "last termination state set", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "0", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + container := status.ContainerStatuses[0] + if container.LastTerminationState.Terminated.ExitCode != 2 { + t.Fatalf("unexpected last state: %#v", container.LastTerminationState) + } + expectTerminatedUnknown(t, container.State) + }, + }, + { + name: "no previous state", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults the first init container", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults only the first init container", + pod: newPod(2, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.InitContainerStatuses[1].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults gaps", + pod: newPod(4, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + {Name: "init-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.InitContainerStatuses[3].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "failed last container is uninitialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last container is initialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 0) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last previous container is initialized, and container state is overwritten", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + { + Name: "init-2", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[2].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "running container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "evidence of terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminated(t, status.ContainerStatuses[0].State, 0) + }, + }, + { + name: "evidence of previously terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) + + original := tc.pod.DeepCopy() + syncer.SetPodStatus(original, original.Status) + + copied := tc.pod.DeepCopy() + if tc.updateFn != nil { + tc.updateFn(copied) + } + expected := copied.DeepCopy() + + syncer.TerminatePod(copied) + status := expectPodStatus(t, syncer, tc.pod.DeepCopy()) + if tc.expectFn != nil { + tc.expectFn(t, status) + return + } + if !reflect.DeepEqual(expected.Status, status) { + diff := cmp.Diff(expected.Status, status) + if len(diff) == 0 { + t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status) + } + t.Fatalf("unexpected status: %s", diff) + } + }) + } +} + func TestSetContainerReadiness(t *testing.T) { cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} @@ -957,6 +1263,7 @@ func TestDeletePods(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true m.podManager.AddPod(pod) status := getRandomPodStatus() now := metav1.Now() @@ -966,6 +1273,22 @@ func TestDeletePods(t *testing.T) { verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } +func TestDeletePodWhileReclaiming(t *testing.T) { + pod := getTestPod() + t.Logf("Set the deletion timestamp.") + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + client := fake.NewSimpleClientset(pod) + m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false + m.podManager.AddPod(pod) + status := getRandomPodStatus() + now := metav1.Now() + status.StartTime = &now + m.SetPodStatus(pod, status) + t.Logf("Expect to see a delete action.") + verifyActions(t, m, []core.Action{getAction(), patchAction()}) +} + func TestDoNotDeleteMirrorPods(t *testing.T) { staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} @@ -1070,19 +1393,22 @@ func deleteAction() core.DeleteAction { func TestMergePodStatus(t *testing.T) { useCases := []struct { - desc string - oldPodStatus func(input v1.PodStatus) v1.PodStatus - newPodStatus func(input v1.PodStatus) v1.PodStatus - expectPodStatus v1.PodStatus + desc string + hasRunningContainers bool + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus }{ { "no change", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input }, getPodStatus(), }, { "readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { input.Conditions[0].Status = v1.ConditionFalse @@ -1105,6 +1431,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1134,6 +1461,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition and readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1166,6 +1494,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1199,13 +1528,77 @@ func TestMergePodStatus(t *testing.T) { Message: "Message", }, }, + { + "phase is transitioning to failed and no containers running", + false, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Evicted", + Message: "Was Evicted", + }, + }, + { + "phase is transitioning to failed and containers running", + true, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Unknown", + Message: "Message", + }, + }, } for _, tc := range useCases { - output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) - if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { - t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) - } + t.Run(tc.desc, func(t *testing.T) { + output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers) + if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { + t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output)) + } + }) } } diff --git a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go index 174c6e17515..98c3b226c0b 100644 --- a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go +++ b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go @@ -16,13 +16,18 @@ limitations under the License. package testing -import "k8s.io/api/core/v1" +import v1 "k8s.io/api/core/v1" // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. -type FakePodDeletionSafetyProvider struct{} - -// PodResourcesAreReclaimed implements PodDeletionSafetyProvider. -// Always reports that all pod resources are reclaimed. -func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - return true +type FakePodDeletionSafetyProvider struct { + Reclaimed bool + HasRunning bool +} + +func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { + return f.Reclaimed +} + +func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return f.HasRunning } diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index 82618b28785..a58d2886b1d 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -103,6 +103,20 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(po return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status) } +// PodCouldHaveRunningContainers mocks base method +func (m *MockPodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PodCouldHaveRunningContainers", pod) + ret0, _ := ret[0].(bool) + return ret0 +} + +// PodCouldHaveRunningContainers indicates an expected call of PodCouldHaveRunningContainers +func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContainers(pod interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod) +} + // MockManager is a mock of Manager interface type MockManager struct { ctrl *gomock.Controller From ca98714ec07c80a64552251680feb5e1f25f50df Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 14 Mar 2022 12:58:06 -0400 Subject: [PATCH 2/4] test: Add E2E for init container pod deletion Exploring termination revealed we have race conditions in certain parts of pod initialization and termination. To better catch these issues refactor the existing test so it can be reused, and then test a number of alternate scenarios. --- test/e2e/node/pods.go | 699 ++++++++++++++++++++++++++---------------- 1 file changed, 432 insertions(+), 267 deletions(-) diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index a4cf9f3bf5e..79a4c488647 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" @@ -204,274 +205,19 @@ var _ = SIGDescribe("Pods Extended", func() { ginkgo.It("should never report success for a pending container", func() { ginkgo.By("creating pods that should always exit 1 and terminating the pod after a random delay") - - var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`) - - var ( - lock sync.Mutex - errs []error - - wg sync.WaitGroup + createAndTestPodRepeatedly( + 3, 15, + podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000}, + podClient.PodInterface, + ) + }) + ginkgo.It("should never report container start when an init container fails", func() { + ginkgo.By("creating pods with an init container that always exit 1 and terminating the pod after a random delay") + createAndTestPodRepeatedly( + 3, 15, + podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000, initContainer: true}, + podClient.PodInterface, ) - - r := prometheus.NewRegistry() - h := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: "start_latency", - Objectives: map[float64]float64{ - 0.5: 0.05, - 0.75: 0.025, - 0.9: 0.01, - 0.99: 0.001, - }, - }, []string{"node"}) - r.MustRegister(h) - - const delay = 2000 - const workers = 3 - const pods = 15 - var min, max time.Duration - for i := 0; i < workers; i++ { - wg.Add(1) - go func(i int) { - defer ginkgo.GinkgoRecover() - defer wg.Done() - for retries := 0; retries < pods; retries++ { - name := fmt.Sprintf("pod-submit-status-%d-%d", i, retries) - value := strconv.Itoa(time.Now().Nanosecond()) - one := int64(1) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - "name": "foo", - "time": value, - }, - }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - TerminationGracePeriodSeconds: &one, - Containers: []v1.Container{ - { - Name: "busybox", - Image: imageutils.GetE2EImage(imageutils.BusyBox), - Command: []string{ - "/bin/false", - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("5m"), - v1.ResourceMemory: resource.MustParse("10Mi"), - }, - }, - }, - }, - }, - } - - // create the pod, capture the change events, then delete the pod - start := time.Now() - created := podClient.Create(pod) - ch := make(chan []watch.Event) - waitForWatch := make(chan struct{}) - go func() { - defer ginkgo.GinkgoRecover() - defer close(ch) - w, err := podClient.Watch(context.TODO(), metav1.ListOptions{ - ResourceVersion: created.ResourceVersion, - FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), - }) - if err != nil { - framework.Logf("Unable to watch pod %s: %v", pod.Name, err) - return - } - defer w.Stop() - close(waitForWatch) - events := []watch.Event{ - {Type: watch.Added, Object: created}, - } - for event := range w.ResultChan() { - events = append(events, event) - if event.Type == watch.Error { - framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object) - } - if event.Type == watch.Deleted { - framework.Logf("watch delete seen for %s", pod.Name) - break - } - } - ch <- events - }() - - select { - case <-ch: // in case the goroutine above exits before establishing the watch - case <-waitForWatch: // when the watch is established - } - t := time.Duration(rand.Intn(delay)) * time.Millisecond - time.Sleep(t) - err := podClient.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err, "failed to delete pod") - - var ( - events []watch.Event - ok bool - ) - select { - case events, ok = <-ch: - if !ok { - continue - } - if len(events) < 2 { - framework.Fail("only got a single event") - } - case <-time.After(5 * time.Minute): - framework.Failf("timed out waiting for watch events for %s", pod.Name) - } - - end := time.Now() - - // check the returned events for consistency - var duration, completeDuration time.Duration - var hasContainers, hasTerminated, hasTerminalPhase, hasRunningContainers bool - verifyFn := func(event watch.Event) error { - var ok bool - pod, ok = event.Object.(*v1.Pod) - if !ok { - framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object) - return nil - } - - if len(pod.Status.InitContainerStatuses) != 0 { - return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) - } - if len(pod.Status.ContainerStatuses) == 0 { - if hasContainers { - return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) - } - return nil - } - hasContainers = true - if len(pod.Status.ContainerStatuses) != 1 { - return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) - } - status := pod.Status.ContainerStatuses[0] - t := status.State.Terminated - if hasTerminated { - if status.State.Waiting != nil || status.State.Running != nil { - return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status) - } - if t == nil { - return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status) - } - } - var hasNoStartTime bool - hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil - if t != nil { - if !t.FinishedAt.Time.IsZero() { - if t.StartedAt.IsZero() { - hasNoStartTime = true - } else { - duration = t.FinishedAt.Sub(t.StartedAt.Time) - } - completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time) - } - - defer func() { hasTerminated = true }() - switch { - case t.ExitCode == 1: - // expected - case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"): - // expected, pod was force-killed after grace period - case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message): - // pod volume teardown races with container start in CRI, which reports a failure - framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName) - default: - data, _ := json.MarshalIndent(pod.Status, "", " ") - framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data)) - return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message) - } - switch { - case duration > time.Hour: - // problem with status reporting - return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, duration, t.StartedAt, t.FinishedAt) - case hasNoStartTime: - // should never happen - return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt) - } - } - if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { - hasTerminalPhase = true - } else { - if hasTerminalPhase { - return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status) - } - } - return nil - } - - var eventErr error - for _, event := range events[1:] { - if err := verifyFn(event); err != nil { - eventErr = err - break - } - } - func() { - lock.Lock() - defer lock.Unlock() - - if eventErr != nil { - errs = append(errs, eventErr) - return - } - - if !hasTerminalPhase { - var names []string - for _, status := range pod.Status.ContainerStatuses { - if status.State.Running != nil { - names = append(names, status.Name) - } - } - switch { - case len(names) > 0: - errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ","))) - case pod.Status.Phase != v1.PodPending: - errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase)) - } - } - if hasRunningContainers { - data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ") - errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data))) - } - }() - - if duration < min { - min = duration - } - if duration > max || max == 0 { - max = duration - } - h.WithLabelValues(pod.Spec.NodeName).Observe(end.Sub(start).Seconds()) - framework.Logf("Pod %s on node %s timings total=%s t=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, end.Sub(start), t, completeDuration, duration) - } - - }(i) - } - - wg.Wait() - - if len(errs) > 0 { - var messages []string - for _, err := range errs { - messages = append(messages, err.Error()) - } - framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n")) - } - values, _ := r.Gather() - var buf bytes.Buffer - for _, m := range values { - expfmt.MetricFamilyToText(&buf, m) - } - framework.Logf("Summary of latencies:\n%s", buf.String()) }) }) @@ -552,3 +298,422 @@ var _ = SIGDescribe("Pods Extended", func() { }) }) }) + +func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) { + var ( + lock sync.Mutex + errs []error + + wg sync.WaitGroup + ) + + r := prometheus.NewRegistry() + h := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "latency", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.75: 0.025, + 0.9: 0.01, + 0.99: 0.001, + }, + }, []string{"node"}) + r.MustRegister(h) + + for i := 0; i < workers; i++ { + wg.Add(1) + go func(i int) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + for retries := 0; retries < iterations; retries++ { + pod := scenario.Pod(i, retries) + + // create the pod, capture the change events, then delete the pod + start := time.Now() + created, err := podClient.Create(context.TODO(), pod, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + + ch := make(chan []watch.Event) + waitForWatch := make(chan struct{}) + go func() { + defer ginkgo.GinkgoRecover() + defer close(ch) + w, err := podClient.Watch(context.TODO(), metav1.ListOptions{ + ResourceVersion: created.ResourceVersion, + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + framework.Logf("Unable to watch pod %s: %v", pod.Name, err) + return + } + defer w.Stop() + close(waitForWatch) + events := []watch.Event{ + {Type: watch.Added, Object: created}, + } + for event := range w.ResultChan() { + events = append(events, event) + if event.Type == watch.Error { + framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object) + } + if scenario.IsLastEvent(event) { + framework.Logf("watch last event seen for %s", pod.Name) + break + } + } + ch <- events + }() + + select { + case <-ch: // in case the goroutine above exits before establishing the watch + case <-waitForWatch: // when the watch is established + } + + verifier, scenario, err := scenario.Action(pod) + framework.ExpectNoError(err, "failed to take action") + + var ( + events []watch.Event + ok bool + ) + select { + case events, ok = <-ch: + if !ok { + continue + } + if len(events) < 2 { + framework.Fail("only got a single event") + } + case <-time.After(5 * time.Minute): + framework.Failf("timed out waiting for watch events for %s", pod.Name) + } + + end := time.Now() + + var eventErr error + for _, event := range events[1:] { + if err := verifier.Verify(event); err != nil { + eventErr = err + break + } + } + + total := end.Sub(start) + + var lastPod *v1.Pod = pod + func() { + lock.Lock() + defer lock.Unlock() + + if eventErr != nil { + errs = append(errs, eventErr) + return + } + pod, verifyErrs := verifier.VerifyFinal(scenario, total) + if pod != nil { + lastPod = pod + } + errs = append(errs, verifyErrs...) + }() + + h.WithLabelValues(lastPod.Spec.NodeName).Observe(total.Seconds()) + } + }(i) + } + + wg.Wait() + + if len(errs) > 0 { + var messages []string + for _, err := range errs { + messages = append(messages, err.Error()) + } + framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n")) + } + values, _ := r.Gather() + var buf bytes.Buffer + for _, m := range values { + expfmt.MetricFamilyToText(&buf, m) + } + framework.Logf("Summary of latencies:\n%s", buf.String()) +} + +type podScenario interface { + Pod(worker, attempt int) *v1.Pod + Action(*v1.Pod) (podScenarioVerifier, string, error) + IsLastEvent(event watch.Event) bool +} + +type podScenarioVerifier interface { + Verify(event watch.Event) error + VerifyFinal(scenario string, duration time.Duration) (*v1.Pod, []error) +} + +type podFastDeleteScenario struct { + client v1core.PodInterface + delayMs int + + initContainer bool +} + +func (s podFastDeleteScenario) Verifier(pod *v1.Pod) podScenarioVerifier { + return &podStartVerifier{} +} + +func (s podFastDeleteScenario) IsLastEvent(event watch.Event) bool { + if event.Type == watch.Deleted { + return true + } + return false +} + +func (s podFastDeleteScenario) Action(pod *v1.Pod) (podScenarioVerifier, string, error) { + t := time.Duration(rand.Intn(s.delayMs)) * time.Millisecond + scenario := fmt.Sprintf("t=%s", t) + time.Sleep(t) + return &podStartVerifier{pod: pod}, scenario, s.client.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +} + +func (s podFastDeleteScenario) Pod(worker, attempt int) *v1.Pod { + name := fmt.Sprintf("pod-terminate-status-%d-%d", worker, attempt) + value := strconv.Itoa(time.Now().Nanosecond()) + one := int64(1) + if s.initContainer { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "name": "foo", + "time": value, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &one, + InitContainers: []v1.Container{ + { + Name: "fail", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/false", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "blocked", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/true", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + }, + } + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "name": "foo", + "time": value, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &one, + Containers: []v1.Container{ + { + Name: "fail", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/false", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + }, + } +} + +// podStartVerifier checks events for a given pod and looks for unexpected +// transitions. It assumes one container running to completion. +type podStartVerifier struct { + pod *v1.Pod + hasInitContainers bool + hasContainers bool + hasTerminated bool + hasRunningContainers bool + hasTerminalPhase bool + duration time.Duration + completeDuration time.Duration +} + +var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`) + +// Verify takes successive watch events for a given pod and returns an error if the status is unexpected. +// This verifier works for any pod which has 0 init containers and 1 regular container. +func (v *podStartVerifier) Verify(event watch.Event) error { + var ok bool + pod, ok := event.Object.(*v1.Pod) + if !ok { + framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object) + return nil + } + v.pod = pod + + if len(pod.Spec.InitContainers) > 0 { + if len(pod.Status.InitContainerStatuses) == 0 { + if v.hasInitContainers { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + return nil + } + v.hasInitContainers = true + if len(pod.Status.InitContainerStatuses) != 1 { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + + } else { + if len(pod.Status.InitContainerStatuses) != 0 { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + } + + if len(pod.Status.ContainerStatuses) == 0 { + if v.hasContainers { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + return nil + } + v.hasContainers = true + if len(pod.Status.ContainerStatuses) != 1 { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + + if status := findContainerStatusInPod(pod, "blocked"); status != nil { + if (status.Started != nil && *status.Started == true) || status.LastTerminationState.Terminated != nil || status.State.Waiting == nil { + return fmt.Errorf("pod %s on node %s should not have started the blocked container: %#v", pod.Name, pod.Spec.NodeName, status) + } + } + + status := findContainerStatusInPod(pod, "fail") + if status == nil { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status) + } + + t := status.State.Terminated + if v.hasTerminated { + if status.State.Waiting != nil || status.State.Running != nil { + return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status) + } + if t == nil { + return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status) + } + } + var hasNoStartTime bool + v.hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil + if t != nil { + if !t.FinishedAt.Time.IsZero() { + if t.StartedAt.IsZero() { + hasNoStartTime = true + } else { + v.duration = t.FinishedAt.Sub(t.StartedAt.Time) + } + v.completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time) + } + + defer func() { v.hasTerminated = true }() + switch { + case t.ExitCode == 1: + // expected + case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"): + // expected, pod was force-killed after grace period + case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message): + // pod volume teardown races with container start in CRI, which reports a failure + framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName) + default: + data, _ := json.MarshalIndent(pod.Status, "", " ") + framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data)) + return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message) + } + switch { + case v.duration > time.Hour: + // problem with status reporting + return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, v.duration, t.StartedAt, t.FinishedAt) + case hasNoStartTime: + // should never happen + return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt) + } + } + if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { + v.hasTerminalPhase = true + } else { + if v.hasTerminalPhase { + return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status) + } + } + return nil +} + +func (v *podStartVerifier) VerifyFinal(scenario string, total time.Duration) (*v1.Pod, []error) { + var errs []error + pod := v.pod + if !v.hasTerminalPhase { + var names []string + for _, status := range pod.Status.ContainerStatuses { + if status.State.Running != nil { + names = append(names, status.Name) + } + } + switch { + case len(names) > 0: + errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ","))) + case pod.Status.Phase != v1.PodPending: + errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase)) + } + } + if v.hasRunningContainers { + data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ") + errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data))) + } + + framework.Logf("Pod %s on node %s %s total=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, scenario, total, v.completeDuration, v.duration) + return pod, errs +} + +// findContainerStatusInPod finds a container status by its name in the provided pod +func findContainerStatusInPod(pod *v1.Pod, containerName string) *v1.ContainerStatus { + for _, container := range pod.Status.InitContainerStatuses { + if container.Name == containerName { + return &container + } + } + for _, container := range pod.Status.ContainerStatuses { + if container.Name == containerName { + return &container + } + } + for _, container := range pod.Status.EphemeralContainerStatuses { + if container.Name == containerName { + return &container + } + } + return nil +} From c70f1955c4bfd6f4b95c72c8d0391ac64c164631 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 25 Feb 2022 00:33:03 -0800 Subject: [PATCH 3/4] test: Add E2E for job completions with cpu reservation Create an E2E test that creates a job that spawns a pod that should succeed. The job reserves a fixed amount of CPU and has a large number of completions and parallelism. Use to repro github.com/kubernetes/kubernetes/issues/106884 Signed-off-by: David Porter --- pkg/kubelet/status/status_manager.go | 8 +++++ test/e2e/apps/job.go | 52 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 8e956350882..377d44b4718 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -326,6 +326,14 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta } +// TerminatePod ensures that the status of containers is properly defaulted at the end of the pod +// lifecycle. As the Kubelet must reconcile with the container runtime to observe container status +// there is always the possibility we are unable to retrieve one or more container statuses due to +// garbage collection, admin action, or loss of temporary data on a restart. This method ensures +// that any absent container status is treated as a failure so that we do not incorrectly describe +// the pod as successful. If we have not yet initialized the pod in the presence of init containers, +// the init container failure status is sufficient to describe the pod as failing, and we do not need +// to override waiting containers (unless there is evidence the pod previously started those containers). func (m *manager) TerminatePod(pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 469d0060ce9..8b0be23da24 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -25,6 +25,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -35,6 +36,7 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" + "k8s.io/kubernetes/test/e2e/scheduling" "k8s.io/utils/pointer" "github.com/onsi/ginkgo" @@ -45,6 +47,10 @@ var _ = SIGDescribe("Job", func() { f := framework.NewDefaultFramework("job") parallelism := int32(2) completions := int32(4) + + largeParallelism := int32(90) + largeCompletions := int32(90) + backoffLimit := int32(6) // default value // Simplest case: N pods succeed @@ -361,6 +367,52 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(pod.Status.Phase, v1.PodFailed) } }) + + ginkgo.It("should run a job to completion with CPU requests [Serial]", func() { + ginkgo.By("Creating a job that with CPU requests") + + testNodeName := scheduling.GetNodeThatCanRunPod(f) + targetNode, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), testNodeName, metav1.GetOptions{}) + framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName) + + cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU] + if !ok { + framework.Failf("Unable to get node's %q cpu", targetNode.Name) + } + + cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value()))) + + backoff := 0 + ginkgo.By("Creating a job") + job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, largeParallelism, largeCompletions, nil, int32(backoff)) + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpuRequest), + }, + } + job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName} + } + + framework.Logf("Creating job %q with a node hostname selector %q wth cpu request %q", job.Name, testNodeName, cpuRequest) + job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring job reaches completions") + err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, largeCompletions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring pods for job exist") + pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) + successes := int32(0) + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodSucceeded { + successes++ + } + } + framework.ExpectEqual(successes, largeCompletions, "expected %d successful job pods, but got %d", largeCompletions, successes) + }) }) // waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. From d6cd51e5c042c92f4a51d63ffbdd3bda669e1e50 Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 8 Mar 2022 19:54:25 -0800 Subject: [PATCH 4/4] test: Verify that nodes do not transition to Failed while ready Signed-off-by: David Porter --- test/e2e_node/node_shutdown_linux_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/e2e_node/node_shutdown_linux_test.go b/test/e2e_node/node_shutdown_linux_test.go index 4d3b93b98fe..18793c3a932 100644 --- a/test/e2e_node/node_shutdown_linux_test.go +++ b/test/e2e_node/node_shutdown_linux_test.go @@ -30,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" + "k8s.io/kubectl/pkg/util/podutils" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -128,6 +129,11 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") for _, pod := range list.Items { + if isPodStatusAffectedByIssue108594(&pod) { + framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status) + framework.Failf("failing test due to detecting invalid pod status") + } + if kubelettypes.IsCriticalPod(&pod) { if isPodShutdown(&pod) { framework.Logf("Expecting critical pod to be running, but it's not currently. Pod: %q, Pod Status %+v", pod.Name, pod.Status) @@ -155,6 +161,10 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") for _, pod := range list.Items { + if isPodStatusAffectedByIssue108594(&pod) { + framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status) + framework.Failf("failing test due to detecting invalid pod status") + } if !isPodShutdown(&pod) { framework.Logf("Expecting pod to be shutdown, but it's not currently: Pod: %q, Pod Status %+v", pod.Name, pod.Status) return fmt.Errorf("pod should be shutdown, phase: %s", pod.Status.Phase) @@ -541,3 +551,8 @@ func isPodShutdown(pod *v1.Pod) bool { return pod.Status.Message == podShutdownMessage && pod.Status.Reason == podShutdownReason && hasContainersNotReadyCondition && pod.Status.Phase == v1.PodFailed } + +// Pods should never report failed phase and have ready condition = true (https://github.com/kubernetes/kubernetes/issues/108594) +func isPodStatusAffectedByIssue108594(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodFailed && podutils.IsPodReady(pod) +}