diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2b96020d722..ab255e5216f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1452,24 +1452,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // syncPod is the transaction script for the sync of a single pod (setting up) -// a pod. The reverse (teardown) is handled in syncTerminatingPod and -// syncTerminatedPod. If syncPod exits without error, then the pod runtime -// state is in sync with the desired configuration state (pod is running). -// If syncPod exits with a transient error, the next invocation of syncPod -// is expected to make progress towards reaching the runtime state. +// a pod. This method is reentrant and expected to converge a pod towards the +// desired state of the spec. The reverse (teardown) is handled in +// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error, +// then the pod runtime state is in sync with the desired configuration state +// (pod is running). If syncPod exits with a transient error, the next +// invocation of syncPod is expected to make progress towards reaching the +// runtime state. syncPod exits with isTerminal when the pod was detected to +// have reached a terminal lifecycle phase due to container exits (for +// RestartNever or RestartOnFailure) and the next method invoked will by +// syncTerminatingPod. // // Arguments: // -// o - the SyncPodOptions for this invocation +// updateType - whether this is a create (first time) or an update, should +// only be used for metrics since this method must be reentrant +// pod - the pod that is being set up +// mirrorPod - the mirror pod known to the kubelet for this pod, if any +// podStatus - the most recent pod status observed for this pod which can +// be used to determine the set of actions that should be taken during +// this loop of syncPod // // The workflow is: -// * Kill the pod immediately if update type is SyncPodKill // * If the pod is being created, record pod worker start latency // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod // * If the pod is being seen as running for the first time, record pod // start latency // * Update the status of the pod in the status manager -// * Kill the pod if it should not be running due to soft admission +// * Stop the pod's containers if it should not be running due to soft +// admission +// * Ensure any background tracking for a runnable pod is started // * Create a mirror pod if the pod is a static pod, and does not // already have a mirror pod // * Create the data directories for the pod if they do not exist @@ -1483,10 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // // This operation writes all events that are dispatched in order to provide // the most accurate information possible about an error situation to aid debugging. -// Callers should not throw an event if this operation returns an error. -func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +// Callers should not write an event if this operation returns an error. +func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) - defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + defer func() { + klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal) + }() // Latency measurements for the main workflow are relative to the // first time the pod was seen by kubelet. @@ -1518,11 +1532,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType for _, ipInfo := range apiPodStatus.PodIPs { podStatus.IPs = append(podStatus.IPs, ipInfo.IP) } - if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { podStatus.IPs = []string{apiPodStatus.PodIP} } + // If the pod is terminal, we don't need to continue to setup the pod + if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed { + kl.statusManager.SetPodStatus(pod, apiPodStatus) + isTerminal = true + return isTerminal, nil + } + // If the pod should not be running, we request the pod's containers be stopped. This is not the same // as termination (we want to stop the pod, but potentially restart it later if soft admission allows // it later). Set the status and phase appropriately @@ -1572,13 +1592,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } - return syncErr + return false, syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) - return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) + return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) } // ensure the kubelet knows about referenced secrets or configmaps used by the pod @@ -1635,7 +1655,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) - return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) + return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } @@ -1676,7 +1696,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.makePodDataDirs(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) - return err + return false, err } // Volume manager will not mount volumes for terminating pods @@ -1686,7 +1706,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) - return err + return false, err } } @@ -1702,14 +1722,14 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime, so we get better errors. - return err + return false, err } } - return nil + return false, nil } - return nil + return false, nil } // syncTerminatingPod is expected to terminate all running containers in a pod. Once this method diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 2808ad02b2c..432db4add9f 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -915,6 +915,12 @@ func countRunningContainerStatus(status v1.PodStatus) int { return runningContainers } +// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running +// containers. This returns false if the pod has not yet been started or the pod is unknown. +func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return kl.podWorkers.CouldHaveRunningContainers(pod.UID) +} + // PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have // been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { @@ -1424,7 +1430,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { } // generateAPIPodStatus creates the final API pod status for a pod, given the -// internal pod status. +// internal pod status. This method should only be called from within sync*Pod methods. func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index d7fe88c446d..e8c2d06f1df 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -505,9 +505,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -584,9 +584,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) { kubelet := testKubelet.kubelet var got bool kubelet.podWorkers = &fakePodWorkers{ - syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { got = true - return nil + return false, nil }, cache: kubelet.podCache, t: t, @@ -1300,8 +1300,11 @@ func TestCreateMirrorPod(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pods := []*v1.Pod{pod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } podFullName := kubecontainer.GetPodFullName(pod) assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName) assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods()) @@ -1332,8 +1335,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*v1.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) + isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) assert.NoError(t, err) + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } name := kubecontainer.GetPodFullName(pod) creates, deletes := manager.GetCounts(name) if creates != 1 || deletes != 1 { @@ -1489,13 +1495,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { }) kubelet.podManager.SetPods([]*v1.Pod{pod}) - err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource pod.Spec.HostNetwork = true - err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error") + if isTerminal { + t.Fatalf("pod should not be terminal: %#v", pod) + } } func TestFilterOutInactivePods(t *testing.T) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index d8dfaa4034f..5632745e060 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -218,7 +218,7 @@ type PodWorkers interface { } // the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod) -type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error +type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) // the function to invoke to terminate a pod (ensure no running processes are present) type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error @@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + var isTerminal bool err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate // status updates on resyncs (the result of the last sync), transitions to @@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn) default: - err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) + isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) } lastSyncTime = time.Now() return err }() + var phaseTransition bool switch { case err == context.Canceled: // when the context is cancelled we expect an update to already be queued @@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { } // otherwise we move to the terminating phase p.completeTerminating(pod) + phaseTransition = true + + case isTerminal: + // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating + klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) + p.completeSync(pod) + phaseTransition = true } - // queue a retry for errors if necessary, then put the next event in the channel if any - p.completeWork(pod, err) + // queue a retry if necessary, then put the next event in the channel if any + p.completeWork(pod, phaseTransition, err) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } @@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc { return nil } +// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should +// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways +// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by +// UpdatePod. +func (p *podWorkers) completeSync(pod *v1.Pod) { + p.podLock.Lock() + defer p.podLock.Unlock() + + klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID) + + if status, ok := p.podSyncStatuses[pod.UID]; ok { + if status.terminatingAt.IsZero() { + status.terminatingAt = time.Now() + } else { + klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + status.startedTerminating = true + } + + p.lastUndeliveredWorkUpdate[pod.UID] = podWork{ + WorkType: TerminatingPodWork, + Options: UpdatePodOptions{ + Pod: pod, + }, + } +} + // completeTerminating is invoked when syncTerminatingPod completes successfully, which means // no container is running, no container will be started in the future, and we are ready for // cleanup. This updates the termination state which prevents future syncs and will ensure @@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) { // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. -func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) { +func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) { // Requeue the last update if the last sync returned error. switch { + case phaseTransition: + p.workQueue.Enqueue(pod.UID, 0) case syncErr == nil: // No error; requeue at the regular resync interval. p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index afde2b0d789..c47947fa579 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -18,7 +18,6 @@ package kubelet import ( "context" - "flag" "reflect" "strconv" "sync" @@ -31,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -48,6 +46,7 @@ type fakePodWorkers struct { t TestingInterface triggeredDeletion []types.UID + triggeredTerminal []types.UID statusLock sync.Mutex running map[types.UID]bool @@ -79,9 +78,13 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) { case kubetypes.SyncPodKill: f.triggeredDeletion = append(f.triggeredDeletion, uid) default: - if err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status); err != nil { + isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status) + if err != nil { f.t.Errorf("Unexpected error: %v", err) } + if isTerminal { + f.triggeredTerminal = append(f.triggeredTerminal, uid) + } } } @@ -249,7 +252,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { fakeCache := containertest.NewFakeCache(fakeRuntime) fakeQueue := &fakeQueue{} w := newPodWorkers( - func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { func() { lock.Lock() defer lock.Unlock() @@ -259,7 +262,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { updateType: updateType, }) }() - return nil + return false, nil }, func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { func() { @@ -530,9 +533,84 @@ func newUIDSet(uids ...types.UID) sets.String { return set } -func init() { - klog.InitFlags(nil) - flag.Lookup("v").Value.Set("5") +type terminalPhaseSync struct { + lock sync.Mutex + fn syncPodFnType + terminal sets.String +} + +func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { + isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus) + if err != nil { + return false, err + } + if !isTerminal { + s.lock.Lock() + defer s.lock.Unlock() + isTerminal = s.terminal.Has(string(pod.UID)) + } + return isTerminal, nil +} + +func (s *terminalPhaseSync) SetTerminal(uid types.UID) { + s.lock.Lock() + defer s.lock.Unlock() + s.terminal.Insert(string(uid)) +} + +func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync { + return &terminalPhaseSync{ + fn: fn, + terminal: sets.NewString(), + } +} + +func TestTerminalPhaseTransition(t *testing.T) { + podWorkers, _ := createPodWorkers() + var channels WorkChannel + podWorkers.workerChannelFn = channels.Intercept + terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.syncPodFn) + podWorkers.syncPodFn = terminalPhaseSyncer.SyncPod + + // start pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod running + pod1 := podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // send another update to the pod + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod still running + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } + + // the next sync should result in a transition to terminal + terminalPhaseSyncer.SetTerminal(types.UID("1")) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: newNamedPod("1", "test1", "pod1", false), + UpdateType: kubetypes.SyncPodUpdate, + }) + drainAllWorkers(podWorkers) + + // should observe pod terminating + pod1 = podWorkers.podSyncStatuses[types.UID("1")] + if !pod1.IsTerminationRequested() || !pod1.IsTerminated() { + t.Fatalf("unexpected pod state: %#v", pod1) + } } func TestStaticPodExclusion(t *testing.T) { @@ -1203,15 +1281,15 @@ type simpleFakeKubelet struct { wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus - return nil + return false, nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus kl.wg.Done() - return nil + return false, nil } func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 19b8a4f6a7b..00f3022af5a 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -112,9 +112,10 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results [] // runPod runs a single pod and wait until all containers are running. func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { + var isTerminal bool delay := retryDelay retry := 0 - for { + for !isTerminal { status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) if err != nil { return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) @@ -131,7 +132,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - if err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { + if isTerminal, err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) } if retry >= runOnceMaxRetries { @@ -143,6 +144,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { retry++ delay *= runOnceRetryDelayBackoff } + return nil } // isPodRunning returns true if all containers of a manifest are running. diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 84c8c6f92cf..377d44b4718 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -83,8 +83,10 @@ type PodStatusProvider interface { // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. type PodDeletionSafetyProvider interface { - // A function which returns true if the pod can safely be deleted + // PodResourcesAreReclaimed returns true if the pod can safely be deleted. PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool + // PodCouldHaveRunningContainers returns true if the pod could have running containers. + PodCouldHaveRunningContainers(pod *v1.Pod) bool } // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with @@ -324,6 +326,14 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta } +// TerminatePod ensures that the status of containers is properly defaulted at the end of the pod +// lifecycle. As the Kubelet must reconcile with the container runtime to observe container status +// there is always the possibility we are unable to retrieve one or more container statuses due to +// garbage collection, admin action, or loss of temporary data on a restart. This method ensures +// that any absent container status is treated as a failure so that we do not incorrectly describe +// the pod as successful. If we have not yet initialized the pod in the presence of init containers, +// the init container failure status is sufficient to describe the pod as failing, and we do not need +// to override waiting containers (unless there is evidence the pod previously started those containers). func (m *manager) TerminatePod(pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() @@ -335,19 +345,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) { oldStatus = &cachedStatus.status } status := *oldStatus.DeepCopy() - for i := range status.ContainerStatuses { - if status.ContainerStatuses[i].State.Terminated != nil { - continue - } - status.ContainerStatuses[i].State = v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Reason: "ContainerStatusUnknown", - Message: "The container could not be located when the pod was terminated", - ExitCode: 137, - }, + + // once a pod has initialized, any missing status is treated as a failure + if hasPodInitialized(pod) { + for i := range status.ContainerStatuses { + if status.ContainerStatuses[i].State.Terminated != nil { + continue + } + status.ContainerStatuses[i].State = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Reason: "ContainerStatusUnknown", + Message: "The container could not be located when the pod was terminated", + ExitCode: 137, + }, + } } } - for i := range status.InitContainerStatuses { + + // all but the final suffix of init containers which have no evidence of a container start are + // marked as failed containers + for i := range initializedContainers(status.InitContainerStatuses) { if status.InitContainerStatuses[i].State.Terminated != nil { continue } @@ -364,6 +381,49 @@ func (m *manager) TerminatePod(pod *v1.Pod) { m.updateStatusInternal(pod, status, true) } +// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which +// implies those containers should not be transitioned to terminated status. +func hasPodInitialized(pod *v1.Pod) bool { + // a pod without init containers is always initialized + if len(pod.Spec.InitContainers) == 0 { + return true + } + // if any container has ever moved out of waiting state, the pod has initialized + for _, status := range pod.Status.ContainerStatuses { + if status.LastTerminationState.Terminated != nil || status.State.Waiting == nil { + return true + } + } + // if the last init container has ever completed with a zero exit code, the pod is initialized + if l := len(pod.Status.InitContainerStatuses); l > 0 { + container := pod.Status.InitContainerStatuses[l-1] + if state := container.LastTerminationState; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + if state := container.State; state.Terminated != nil && state.Terminated.ExitCode == 0 { + return true + } + } + // otherwise the pod has no record of being initialized + return false +} + +// initializedContainers returns all status except for suffix of containers that are in Waiting +// state, which is the set of containers that have attempted to start at least once. If all containers +// are Watiing, the first container is always returned. +func initializedContainers(containers []v1.ContainerStatus) []v1.ContainerStatus { + for i := len(containers) - 1; i >= 0; i-- { + if containers[i].State.Waiting == nil || containers[i].LastTerminationState.Terminated != nil { + return containers[0 : i+1] + } + } + // always return at least one container + if len(containers) > 0 { + return containers[0:1] + } + return nil +} + // checkContainerStateTransition ensures that no container is trying to transition // from a terminated to non-terminated state, which is illegal and indicates a // logical error in the kubelet. @@ -619,8 +679,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { return } - oldStatus := pod.Status.DeepCopy() - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) + mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) + + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes)) if err != nil { @@ -630,7 +691,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if unchanged { klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) } else { - klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status) + klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) pod = newPod } @@ -771,25 +832,49 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { return status } -// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions -// not owned by kubelet is preserved from oldPodStatus -func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { - podConditions := []v1.PodCondition{} +// mergePodStatus merges oldPodStatus and newPodStatus to preserve where pod conditions +// not owned by kubelet and to ensure terminal phase transition only happens after all +// running containers have terminated. This method does not modify the old status. +func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus { + podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions)) + for _, c := range oldPodStatus.Conditions { if !kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } - for _, c := range newPodStatus.Conditions { if kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) } } newPodStatus.Conditions = podConditions + + // Delay transitioning a pod to a terminal status unless the pod is actually terminal. + // The Kubelet should never transition a pod to terminal status that could have running + // containers and thus actively be leveraging exclusive resources. Note that resources + // like volumes are reconciled by a subsystem in the Kubelet and will converge if a new + // pod reuses an exclusive resource (unmount -> free -> mount), which means we do not + // need wait for those resources to be detached by the Kubelet. In general, resources + // the Kubelet exclusively owns must be released prior to a pod being reported terminal, + // while resources that have participanting components above the API use the pod's + // transition to a terminal phase (or full deletion) to release those resources. + if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) { + if couldHaveRunningContainers { + newPodStatus.Phase = oldPodStatus.Phase + newPodStatus.Reason = oldPodStatus.Reason + newPodStatus.Message = oldPodStatus.Message + } + } + return newPodStatus } +// isPhaseTerminal returns true if the pod's phase is terminal. +func isPhaseTerminal(phase v1.PodPhase) bool { + return phase == v1.PodFailed || phase == v1.PodSucceeded +} + // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile func NeedToReconcilePodReadiness(pod *v1.Pod) bool { if len(pod.Spec.ReadinessGates) == 0 { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 855fac52ecd..9ecdf67b300 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -648,11 +649,14 @@ func TestTerminatePodWaiting(t *testing.T) { t.Logf("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) - for i := range newStatus.ContainerStatuses { - assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated") + for _, container := range newStatus.ContainerStatuses { + assert.False(t, container.State.Terminated == nil, "expected containers to be terminated") } - for i := range newStatus.InitContainerStatuses { - assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") + for _, container := range newStatus.InitContainerStatuses[:2] { + assert.False(t, container.State.Terminated == nil, "expected init containers to be terminated") + } + for _, container := range newStatus.InitContainerStatuses[2:] { + assert.False(t, container.State.Waiting == nil, "expected init containers to be waiting") } expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}} @@ -662,8 +666,8 @@ func TestTerminatePodWaiting(t *testing.T) { if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) } - if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, expectUnknownState) { - t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, expectUnknownState)) + if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { + t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState)) @@ -680,6 +684,308 @@ func TestTerminatePodWaiting(t *testing.T) { assert.Equal(t, newStatus.Message, firstStatus.Message) } +func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { + newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod { + pod := getTestPod() + for i := 0; i < initContainers; i++ { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ + Name: fmt.Sprintf("init-%d", i), + }) + } + for i := 0; i < containers; i++ { + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("%d", i), + }) + } + pod.Status.StartTime = &metav1.Time{Time: time.Unix(1, 0).UTC()} + for _, fn := range fns { + fn(pod) + } + return pod + } + expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != 137 || state.Terminated.Reason != "ContainerStatusUnknown" || len(state.Terminated.Message) == 0 { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) { + t.Helper() + if state.Terminated == nil || state.Running != nil || state.Waiting != nil { + t.Fatalf("unexpected state: %#v", state) + } + if state.Terminated.ExitCode != exitCode { + t.Fatalf("unexpected terminated state: %#v", state.Terminated) + } + } + expectWaiting := func(t *testing.T, state v1.ContainerState) { + t.Helper() + if state.Terminated != nil || state.Running != nil || state.Waiting == nil { + t.Fatalf("unexpected state: %#v", state) + } + } + + testCases := []struct { + name string + pod *v1.Pod + updateFn func(*v1.Pod) + expectFn func(t *testing.T, status v1.PodStatus) + }{ + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })}, + {pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, + } + })}, + { + name: "last termination state set", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "0", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + container := status.ContainerStatuses[0] + if container.LastTerminationState.Terminated.ExitCode != 2 { + t.Fatalf("unexpected last state: %#v", container.LastTerminationState) + } + expectTerminatedUnknown(t, container.State) + }, + }, + { + name: "no previous state", + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults the first init container", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults only the first init container", + pod: newPod(2, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectWaiting(t, status.InitContainerStatuses[1].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "uninitialized pod defaults gaps", + pod: newPod(4, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + {Name: "init-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.InitContainerStatuses[3].State) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "failed last container is uninitialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 1) + expectWaiting(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last container is initialized", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminated(t, status.InitContainerStatuses[2].State, 0) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "successful last previous container is initialized, and container state is overwritten", + pod: newPod(3, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + {Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + { + Name: "init-2", + LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}, + State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, + }, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[1].State) + expectTerminatedUnknown(t, status.InitContainerStatuses[2].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "running container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + { + name: "evidence of terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminated(t, status.ContainerStatuses[0].State, 0) + }, + }, + { + name: "evidence of previously terminated container proves initialization", + pod: newPod(1, 1, func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyNever + pod.Status.Phase = v1.PodRunning + pod.Status.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}}, + } + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + expectTerminatedUnknown(t, status.InitContainerStatuses[0].State) + expectTerminatedUnknown(t, status.ContainerStatuses[0].State) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) + + original := tc.pod.DeepCopy() + syncer.SetPodStatus(original, original.Status) + + copied := tc.pod.DeepCopy() + if tc.updateFn != nil { + tc.updateFn(copied) + } + expected := copied.DeepCopy() + + syncer.TerminatePod(copied) + status := expectPodStatus(t, syncer, tc.pod.DeepCopy()) + if tc.expectFn != nil { + tc.expectFn(t, status) + return + } + if !reflect.DeepEqual(expected.Status, status) { + diff := cmp.Diff(expected.Status, status) + if len(diff) == 0 { + t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status) + } + t.Fatalf("unexpected status: %s", diff) + } + }) + } +} + func TestSetContainerReadiness(t *testing.T) { cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} @@ -957,6 +1263,7 @@ func TestDeletePods(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true m.podManager.AddPod(pod) status := getRandomPodStatus() now := metav1.Now() @@ -966,6 +1273,22 @@ func TestDeletePods(t *testing.T) { verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } +func TestDeletePodWhileReclaiming(t *testing.T) { + pod := getTestPod() + t.Logf("Set the deletion timestamp.") + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + client := fake.NewSimpleClientset(pod) + m := newTestManager(client) + m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false + m.podManager.AddPod(pod) + status := getRandomPodStatus() + now := metav1.Now() + status.StartTime = &now + m.SetPodStatus(pod, status) + t.Logf("Expect to see a delete action.") + verifyActions(t, m, []core.Action{getAction(), patchAction()}) +} + func TestDoNotDeleteMirrorPods(t *testing.T) { staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} @@ -1070,19 +1393,22 @@ func deleteAction() core.DeleteAction { func TestMergePodStatus(t *testing.T) { useCases := []struct { - desc string - oldPodStatus func(input v1.PodStatus) v1.PodStatus - newPodStatus func(input v1.PodStatus) v1.PodStatus - expectPodStatus v1.PodStatus + desc string + hasRunningContainers bool + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus }{ { "no change", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input }, getPodStatus(), }, { "readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { input.Conditions[0].Status = v1.ConditionFalse @@ -1105,6 +1431,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1134,6 +1461,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition and readiness changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1166,6 +1494,7 @@ func TestMergePodStatus(t *testing.T) { }, { "additional pod condition changes", + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1199,13 +1528,77 @@ func TestMergePodStatus(t *testing.T) { Message: "Message", }, }, + { + "phase is transitioning to failed and no containers running", + false, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Evicted", + Message: "Was Evicted", + }, + }, + { + "phase is transitioning to failed and containers running", + true, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodRunning + input.Reason = "Unknown" + input.Message = "Message" + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Reason = "Evicted" + input.Message = "Was Evicted" + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Reason: "Unknown", + Message: "Message", + }, + }, } for _, tc := range useCases { - output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) - if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { - t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) - } + t.Run(tc.desc, func(t *testing.T) { + output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers) + if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { + t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output)) + } + }) } } diff --git a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go index 174c6e17515..98c3b226c0b 100644 --- a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go +++ b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go @@ -16,13 +16,18 @@ limitations under the License. package testing -import "k8s.io/api/core/v1" +import v1 "k8s.io/api/core/v1" // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. -type FakePodDeletionSafetyProvider struct{} - -// PodResourcesAreReclaimed implements PodDeletionSafetyProvider. -// Always reports that all pod resources are reclaimed. -func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - return true +type FakePodDeletionSafetyProvider struct { + Reclaimed bool + HasRunning bool +} + +func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { + return f.Reclaimed +} + +func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + return f.HasRunning } diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index 82618b28785..a58d2886b1d 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -103,6 +103,20 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(po return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status) } +// PodCouldHaveRunningContainers mocks base method +func (m *MockPodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PodCouldHaveRunningContainers", pod) + ret0, _ := ret[0].(bool) + return ret0 +} + +// PodCouldHaveRunningContainers indicates an expected call of PodCouldHaveRunningContainers +func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContainers(pod interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod) +} + // MockManager is a mock of Manager interface type MockManager struct { ctrl *gomock.Controller diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 469d0060ce9..8b0be23da24 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -25,6 +25,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -35,6 +36,7 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" + "k8s.io/kubernetes/test/e2e/scheduling" "k8s.io/utils/pointer" "github.com/onsi/ginkgo" @@ -45,6 +47,10 @@ var _ = SIGDescribe("Job", func() { f := framework.NewDefaultFramework("job") parallelism := int32(2) completions := int32(4) + + largeParallelism := int32(90) + largeCompletions := int32(90) + backoffLimit := int32(6) // default value // Simplest case: N pods succeed @@ -361,6 +367,52 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(pod.Status.Phase, v1.PodFailed) } }) + + ginkgo.It("should run a job to completion with CPU requests [Serial]", func() { + ginkgo.By("Creating a job that with CPU requests") + + testNodeName := scheduling.GetNodeThatCanRunPod(f) + targetNode, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), testNodeName, metav1.GetOptions{}) + framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName) + + cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU] + if !ok { + framework.Failf("Unable to get node's %q cpu", targetNode.Name) + } + + cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value()))) + + backoff := 0 + ginkgo.By("Creating a job") + job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, largeParallelism, largeCompletions, nil, int32(backoff)) + for i := range job.Spec.Template.Spec.Containers { + job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpuRequest), + }, + } + job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName} + } + + framework.Logf("Creating job %q with a node hostname selector %q wth cpu request %q", job.Name, testNodeName, cpuRequest) + job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring job reaches completions") + err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, largeCompletions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring pods for job exist") + pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) + successes := int32(0) + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodSucceeded { + successes++ + } + } + framework.ExpectEqual(successes, largeCompletions, "expected %d successful job pods, but got %d", largeCompletions, successes) + }) }) // waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index a4cf9f3bf5e..79a4c488647 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" @@ -204,274 +205,19 @@ var _ = SIGDescribe("Pods Extended", func() { ginkgo.It("should never report success for a pending container", func() { ginkgo.By("creating pods that should always exit 1 and terminating the pod after a random delay") - - var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`) - - var ( - lock sync.Mutex - errs []error - - wg sync.WaitGroup + createAndTestPodRepeatedly( + 3, 15, + podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000}, + podClient.PodInterface, + ) + }) + ginkgo.It("should never report container start when an init container fails", func() { + ginkgo.By("creating pods with an init container that always exit 1 and terminating the pod after a random delay") + createAndTestPodRepeatedly( + 3, 15, + podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000, initContainer: true}, + podClient.PodInterface, ) - - r := prometheus.NewRegistry() - h := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: "start_latency", - Objectives: map[float64]float64{ - 0.5: 0.05, - 0.75: 0.025, - 0.9: 0.01, - 0.99: 0.001, - }, - }, []string{"node"}) - r.MustRegister(h) - - const delay = 2000 - const workers = 3 - const pods = 15 - var min, max time.Duration - for i := 0; i < workers; i++ { - wg.Add(1) - go func(i int) { - defer ginkgo.GinkgoRecover() - defer wg.Done() - for retries := 0; retries < pods; retries++ { - name := fmt.Sprintf("pod-submit-status-%d-%d", i, retries) - value := strconv.Itoa(time.Now().Nanosecond()) - one := int64(1) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - "name": "foo", - "time": value, - }, - }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - TerminationGracePeriodSeconds: &one, - Containers: []v1.Container{ - { - Name: "busybox", - Image: imageutils.GetE2EImage(imageutils.BusyBox), - Command: []string{ - "/bin/false", - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("5m"), - v1.ResourceMemory: resource.MustParse("10Mi"), - }, - }, - }, - }, - }, - } - - // create the pod, capture the change events, then delete the pod - start := time.Now() - created := podClient.Create(pod) - ch := make(chan []watch.Event) - waitForWatch := make(chan struct{}) - go func() { - defer ginkgo.GinkgoRecover() - defer close(ch) - w, err := podClient.Watch(context.TODO(), metav1.ListOptions{ - ResourceVersion: created.ResourceVersion, - FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), - }) - if err != nil { - framework.Logf("Unable to watch pod %s: %v", pod.Name, err) - return - } - defer w.Stop() - close(waitForWatch) - events := []watch.Event{ - {Type: watch.Added, Object: created}, - } - for event := range w.ResultChan() { - events = append(events, event) - if event.Type == watch.Error { - framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object) - } - if event.Type == watch.Deleted { - framework.Logf("watch delete seen for %s", pod.Name) - break - } - } - ch <- events - }() - - select { - case <-ch: // in case the goroutine above exits before establishing the watch - case <-waitForWatch: // when the watch is established - } - t := time.Duration(rand.Intn(delay)) * time.Millisecond - time.Sleep(t) - err := podClient.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err, "failed to delete pod") - - var ( - events []watch.Event - ok bool - ) - select { - case events, ok = <-ch: - if !ok { - continue - } - if len(events) < 2 { - framework.Fail("only got a single event") - } - case <-time.After(5 * time.Minute): - framework.Failf("timed out waiting for watch events for %s", pod.Name) - } - - end := time.Now() - - // check the returned events for consistency - var duration, completeDuration time.Duration - var hasContainers, hasTerminated, hasTerminalPhase, hasRunningContainers bool - verifyFn := func(event watch.Event) error { - var ok bool - pod, ok = event.Object.(*v1.Pod) - if !ok { - framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object) - return nil - } - - if len(pod.Status.InitContainerStatuses) != 0 { - return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) - } - if len(pod.Status.ContainerStatuses) == 0 { - if hasContainers { - return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) - } - return nil - } - hasContainers = true - if len(pod.Status.ContainerStatuses) != 1 { - return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) - } - status := pod.Status.ContainerStatuses[0] - t := status.State.Terminated - if hasTerminated { - if status.State.Waiting != nil || status.State.Running != nil { - return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status) - } - if t == nil { - return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status) - } - } - var hasNoStartTime bool - hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil - if t != nil { - if !t.FinishedAt.Time.IsZero() { - if t.StartedAt.IsZero() { - hasNoStartTime = true - } else { - duration = t.FinishedAt.Sub(t.StartedAt.Time) - } - completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time) - } - - defer func() { hasTerminated = true }() - switch { - case t.ExitCode == 1: - // expected - case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"): - // expected, pod was force-killed after grace period - case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message): - // pod volume teardown races with container start in CRI, which reports a failure - framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName) - default: - data, _ := json.MarshalIndent(pod.Status, "", " ") - framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data)) - return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message) - } - switch { - case duration > time.Hour: - // problem with status reporting - return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, duration, t.StartedAt, t.FinishedAt) - case hasNoStartTime: - // should never happen - return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt) - } - } - if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { - hasTerminalPhase = true - } else { - if hasTerminalPhase { - return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status) - } - } - return nil - } - - var eventErr error - for _, event := range events[1:] { - if err := verifyFn(event); err != nil { - eventErr = err - break - } - } - func() { - lock.Lock() - defer lock.Unlock() - - if eventErr != nil { - errs = append(errs, eventErr) - return - } - - if !hasTerminalPhase { - var names []string - for _, status := range pod.Status.ContainerStatuses { - if status.State.Running != nil { - names = append(names, status.Name) - } - } - switch { - case len(names) > 0: - errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ","))) - case pod.Status.Phase != v1.PodPending: - errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase)) - } - } - if hasRunningContainers { - data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ") - errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data))) - } - }() - - if duration < min { - min = duration - } - if duration > max || max == 0 { - max = duration - } - h.WithLabelValues(pod.Spec.NodeName).Observe(end.Sub(start).Seconds()) - framework.Logf("Pod %s on node %s timings total=%s t=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, end.Sub(start), t, completeDuration, duration) - } - - }(i) - } - - wg.Wait() - - if len(errs) > 0 { - var messages []string - for _, err := range errs { - messages = append(messages, err.Error()) - } - framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n")) - } - values, _ := r.Gather() - var buf bytes.Buffer - for _, m := range values { - expfmt.MetricFamilyToText(&buf, m) - } - framework.Logf("Summary of latencies:\n%s", buf.String()) }) }) @@ -552,3 +298,422 @@ var _ = SIGDescribe("Pods Extended", func() { }) }) }) + +func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) { + var ( + lock sync.Mutex + errs []error + + wg sync.WaitGroup + ) + + r := prometheus.NewRegistry() + h := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "latency", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.75: 0.025, + 0.9: 0.01, + 0.99: 0.001, + }, + }, []string{"node"}) + r.MustRegister(h) + + for i := 0; i < workers; i++ { + wg.Add(1) + go func(i int) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + for retries := 0; retries < iterations; retries++ { + pod := scenario.Pod(i, retries) + + // create the pod, capture the change events, then delete the pod + start := time.Now() + created, err := podClient.Create(context.TODO(), pod, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + + ch := make(chan []watch.Event) + waitForWatch := make(chan struct{}) + go func() { + defer ginkgo.GinkgoRecover() + defer close(ch) + w, err := podClient.Watch(context.TODO(), metav1.ListOptions{ + ResourceVersion: created.ResourceVersion, + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + framework.Logf("Unable to watch pod %s: %v", pod.Name, err) + return + } + defer w.Stop() + close(waitForWatch) + events := []watch.Event{ + {Type: watch.Added, Object: created}, + } + for event := range w.ResultChan() { + events = append(events, event) + if event.Type == watch.Error { + framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object) + } + if scenario.IsLastEvent(event) { + framework.Logf("watch last event seen for %s", pod.Name) + break + } + } + ch <- events + }() + + select { + case <-ch: // in case the goroutine above exits before establishing the watch + case <-waitForWatch: // when the watch is established + } + + verifier, scenario, err := scenario.Action(pod) + framework.ExpectNoError(err, "failed to take action") + + var ( + events []watch.Event + ok bool + ) + select { + case events, ok = <-ch: + if !ok { + continue + } + if len(events) < 2 { + framework.Fail("only got a single event") + } + case <-time.After(5 * time.Minute): + framework.Failf("timed out waiting for watch events for %s", pod.Name) + } + + end := time.Now() + + var eventErr error + for _, event := range events[1:] { + if err := verifier.Verify(event); err != nil { + eventErr = err + break + } + } + + total := end.Sub(start) + + var lastPod *v1.Pod = pod + func() { + lock.Lock() + defer lock.Unlock() + + if eventErr != nil { + errs = append(errs, eventErr) + return + } + pod, verifyErrs := verifier.VerifyFinal(scenario, total) + if pod != nil { + lastPod = pod + } + errs = append(errs, verifyErrs...) + }() + + h.WithLabelValues(lastPod.Spec.NodeName).Observe(total.Seconds()) + } + }(i) + } + + wg.Wait() + + if len(errs) > 0 { + var messages []string + for _, err := range errs { + messages = append(messages, err.Error()) + } + framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n")) + } + values, _ := r.Gather() + var buf bytes.Buffer + for _, m := range values { + expfmt.MetricFamilyToText(&buf, m) + } + framework.Logf("Summary of latencies:\n%s", buf.String()) +} + +type podScenario interface { + Pod(worker, attempt int) *v1.Pod + Action(*v1.Pod) (podScenarioVerifier, string, error) + IsLastEvent(event watch.Event) bool +} + +type podScenarioVerifier interface { + Verify(event watch.Event) error + VerifyFinal(scenario string, duration time.Duration) (*v1.Pod, []error) +} + +type podFastDeleteScenario struct { + client v1core.PodInterface + delayMs int + + initContainer bool +} + +func (s podFastDeleteScenario) Verifier(pod *v1.Pod) podScenarioVerifier { + return &podStartVerifier{} +} + +func (s podFastDeleteScenario) IsLastEvent(event watch.Event) bool { + if event.Type == watch.Deleted { + return true + } + return false +} + +func (s podFastDeleteScenario) Action(pod *v1.Pod) (podScenarioVerifier, string, error) { + t := time.Duration(rand.Intn(s.delayMs)) * time.Millisecond + scenario := fmt.Sprintf("t=%s", t) + time.Sleep(t) + return &podStartVerifier{pod: pod}, scenario, s.client.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +} + +func (s podFastDeleteScenario) Pod(worker, attempt int) *v1.Pod { + name := fmt.Sprintf("pod-terminate-status-%d-%d", worker, attempt) + value := strconv.Itoa(time.Now().Nanosecond()) + one := int64(1) + if s.initContainer { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "name": "foo", + "time": value, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &one, + InitContainers: []v1.Container{ + { + Name: "fail", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/false", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "blocked", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/true", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + }, + } + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "name": "foo", + "time": value, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &one, + Containers: []v1.Container{ + { + Name: "fail", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/false", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + }, + } +} + +// podStartVerifier checks events for a given pod and looks for unexpected +// transitions. It assumes one container running to completion. +type podStartVerifier struct { + pod *v1.Pod + hasInitContainers bool + hasContainers bool + hasTerminated bool + hasRunningContainers bool + hasTerminalPhase bool + duration time.Duration + completeDuration time.Duration +} + +var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`) + +// Verify takes successive watch events for a given pod and returns an error if the status is unexpected. +// This verifier works for any pod which has 0 init containers and 1 regular container. +func (v *podStartVerifier) Verify(event watch.Event) error { + var ok bool + pod, ok := event.Object.(*v1.Pod) + if !ok { + framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object) + return nil + } + v.pod = pod + + if len(pod.Spec.InitContainers) > 0 { + if len(pod.Status.InitContainerStatuses) == 0 { + if v.hasInitContainers { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + return nil + } + v.hasInitContainers = true + if len(pod.Status.InitContainerStatuses) != 1 { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + + } else { + if len(pod.Status.InitContainerStatuses) != 0 { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + } + + if len(pod.Status.ContainerStatuses) == 0 { + if v.hasContainers { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + return nil + } + v.hasContainers = true + if len(pod.Status.ContainerStatuses) != 1 { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + + if status := findContainerStatusInPod(pod, "blocked"); status != nil { + if (status.Started != nil && *status.Started == true) || status.LastTerminationState.Terminated != nil || status.State.Waiting == nil { + return fmt.Errorf("pod %s on node %s should not have started the blocked container: %#v", pod.Name, pod.Spec.NodeName, status) + } + } + + status := findContainerStatusInPod(pod, "fail") + if status == nil { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status) + } + + t := status.State.Terminated + if v.hasTerminated { + if status.State.Waiting != nil || status.State.Running != nil { + return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status) + } + if t == nil { + return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status) + } + } + var hasNoStartTime bool + v.hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil + if t != nil { + if !t.FinishedAt.Time.IsZero() { + if t.StartedAt.IsZero() { + hasNoStartTime = true + } else { + v.duration = t.FinishedAt.Sub(t.StartedAt.Time) + } + v.completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time) + } + + defer func() { v.hasTerminated = true }() + switch { + case t.ExitCode == 1: + // expected + case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"): + // expected, pod was force-killed after grace period + case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message): + // pod volume teardown races with container start in CRI, which reports a failure + framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName) + default: + data, _ := json.MarshalIndent(pod.Status, "", " ") + framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data)) + return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message) + } + switch { + case v.duration > time.Hour: + // problem with status reporting + return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, v.duration, t.StartedAt, t.FinishedAt) + case hasNoStartTime: + // should never happen + return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt) + } + } + if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { + v.hasTerminalPhase = true + } else { + if v.hasTerminalPhase { + return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status) + } + } + return nil +} + +func (v *podStartVerifier) VerifyFinal(scenario string, total time.Duration) (*v1.Pod, []error) { + var errs []error + pod := v.pod + if !v.hasTerminalPhase { + var names []string + for _, status := range pod.Status.ContainerStatuses { + if status.State.Running != nil { + names = append(names, status.Name) + } + } + switch { + case len(names) > 0: + errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ","))) + case pod.Status.Phase != v1.PodPending: + errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase)) + } + } + if v.hasRunningContainers { + data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ") + errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data))) + } + + framework.Logf("Pod %s on node %s %s total=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, scenario, total, v.completeDuration, v.duration) + return pod, errs +} + +// findContainerStatusInPod finds a container status by its name in the provided pod +func findContainerStatusInPod(pod *v1.Pod, containerName string) *v1.ContainerStatus { + for _, container := range pod.Status.InitContainerStatuses { + if container.Name == containerName { + return &container + } + } + for _, container := range pod.Status.ContainerStatuses { + if container.Name == containerName { + return &container + } + } + for _, container := range pod.Status.EphemeralContainerStatuses { + if container.Name == containerName { + return &container + } + } + return nil +} diff --git a/test/e2e_node/node_shutdown_linux_test.go b/test/e2e_node/node_shutdown_linux_test.go index 4d3b93b98fe..18793c3a932 100644 --- a/test/e2e_node/node_shutdown_linux_test.go +++ b/test/e2e_node/node_shutdown_linux_test.go @@ -30,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" + "k8s.io/kubectl/pkg/util/podutils" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -128,6 +129,11 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") for _, pod := range list.Items { + if isPodStatusAffectedByIssue108594(&pod) { + framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status) + framework.Failf("failing test due to detecting invalid pod status") + } + if kubelettypes.IsCriticalPod(&pod) { if isPodShutdown(&pod) { framework.Logf("Expecting critical pod to be running, but it's not currently. Pod: %q, Pod Status %+v", pod.Name, pod.Status) @@ -155,6 +161,10 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") for _, pod := range list.Items { + if isPodStatusAffectedByIssue108594(&pod) { + framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status) + framework.Failf("failing test due to detecting invalid pod status") + } if !isPodShutdown(&pod) { framework.Logf("Expecting pod to be shutdown, but it's not currently: Pod: %q, Pod Status %+v", pod.Name, pod.Status) return fmt.Errorf("pod should be shutdown, phase: %s", pod.Status.Phase) @@ -541,3 +551,8 @@ func isPodShutdown(pod *v1.Pod) bool { return pod.Status.Message == podShutdownMessage && pod.Status.Reason == podShutdownReason && hasContainersNotReadyCondition && pod.Status.Phase == v1.PodFailed } + +// Pods should never report failed phase and have ready condition = true (https://github.com/kubernetes/kubernetes/issues/108594) +func isPodStatusAffectedByIssue108594(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodFailed && podutils.IsPodReady(pod) +}