diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c2505db4cdb..7031d7c189f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1479,7 +1479,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { os.Exit(1) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod) + kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod) // container log manager must start after container runtime is up to retrieve information from container runtime // and inform container to reopen log file after log rotation. @@ -1689,7 +1689,7 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, } // Generate final API pod status with pod and status manager status - apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) + apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus @@ -1941,7 +1941,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) - apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) + apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false) if podStatusFn != nil { podStatusFn(&apiPodStatus) } @@ -2014,6 +2014,13 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus } } + // Compute and update the status in cache once the pods are no longer running. + // The computation is done here to ensure the pod status used for it contains + // information about the container end states (including exit codes) - when + // SyncTerminatedPod is called the containers may already be removed. + apiPodStatus = kl.generateAPIPodStatus(pod, podStatus, true) + kl.statusManager.SetPodStatus(pod, apiPodStatus) + // we have successfully stopped all containers, the pod is terminating, our status is "done" klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -2050,8 +2057,8 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube } // SyncTerminatedPod cleans up a pod that has terminated (has no running containers). -// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which -// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. This method +// The invocations in this call are expected to tear down all pod resources. +// When this method exits the pod is expected to be ready for cleanup. This method // reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios. // // Because the kubelet has no local store of information, all actions in this method that modify @@ -2071,7 +2078,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // generate the final status of the pod // TODO: should we simply fold this into TerminatePod? that would give a single pod update - apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) + apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true) kl.statusManager.SetPodStatus(pod, apiPodStatus) @@ -2082,6 +2089,21 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus } klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) + if !kl.keepTerminatedPodVolumes { + // This waiting loop relies on the background cleanup which starts after pod workers respond + // true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed. + if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { + volumesExist := kl.podVolumesExist(pod.UID) + if volumesExist { + klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID) + } + return !volumesExist, nil + }); err != nil { + return err + } + klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID) + } + // After volume unmount is complete, let the secret and configmap managers know we're done with this pod if kl.secretManager != nil { kl.secretManager.UnregisterPod(pod) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 3a77baa2464..fbc9104c428 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -901,26 +901,6 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret { return pullSecrets } -func countRunningContainerStatus(status v1.PodStatus) int { - var runningContainers int - for _, c := range status.InitContainerStatuses { - if c.State.Running != nil { - runningContainers++ - } - } - for _, c := range status.ContainerStatuses { - if c.State.Running != nil { - runningContainers++ - } - } - for _, c := range status.EphemeralContainerStatuses { - if c.State.Running != nil { - runningContainers++ - } - } - return runningContainers -} - // PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running // containers. This returns false if the pod has not yet been started or the pod is unknown. func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { @@ -941,48 +921,11 @@ func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { return false } -// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have -// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. -func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - if kl.podWorkers.CouldHaveRunningContainers(pod.UID) { - // We shouldn't delete pods that still have running containers - klog.V(3).InfoS("Pod is terminated, but some containers are still running", "pod", klog.KObj(pod)) - return false - } - if count := countRunningContainerStatus(status); count > 0 { - // We shouldn't delete pods until the reported pod status contains no more running containers (the previous - // check ensures no more status can be generated, this check verifies we have seen enough of the status) - klog.V(3).InfoS("Pod is terminated, but some container status has not yet been reported", "pod", klog.KObj(pod), "running", count) - return false - } - if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes { - // We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes - klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod)) - return false - } - if kl.kubeletConfiguration.CgroupsPerQOS { - pcm := kl.containerManager.NewPodContainerManager() - if pcm.Exists(pod) { - klog.V(3).InfoS("Pod is terminated, but pod cgroup sandbox has not been cleaned up", "pod", klog.KObj(pod)) - return false - } - } - - // Note: we leave pod containers to be reclaimed in the background since dockershim requires the - // container for retrieving logs and we want to make sure logs are available until the pod is - // physically deleted. - - klog.V(3).InfoS("Pod is terminated and all resources are reclaimed", "pod", klog.KObj(pod)) - return true -} - -// podResourcesAreReclaimed simply calls PodResourcesAreReclaimed with the most up-to-date status. -func (kl *Kubelet) podResourcesAreReclaimed(pod *v1.Pod) bool { - status, ok := kl.statusManager.GetPodStatus(pod.UID) - if !ok { - status = pod.Status - } - return kl.PodResourcesAreReclaimed(pod, status) +// PodIsFinished returns true if SyncTerminatedPod is finished, ie. +// all required node-level resources that a pod was consuming have +// been reclaimed by the kubelet. +func (kl *Kubelet) PodIsFinished(pod *v1.Pod) bool { + return kl.podWorkers.ShouldPodBeFinished(pod.UID) } // filterOutInactivePods returns pods that are not in a terminal phase @@ -1440,7 +1383,8 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con } // getPhase returns the phase of a pod given its container info. -func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { +func getPhase(pod *v1.Pod, info []v1.ContainerStatus, podIsTerminal bool) v1.PodPhase { + spec := pod.Spec pendingInitialization := 0 failedInitialization := 0 for _, container := range spec.InitContainers { @@ -1517,6 +1461,19 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { // one container is running return v1.PodRunning case running == 0 && stopped > 0 && unknown == 0: + // The pod is terminal so its containers won't be restarted regardless + // of the restart policy. + if podIsTerminal { + // TODO(#116484): Also assign terminal phase to static pods. + if !kubetypes.IsStaticPod(pod) { + // All containers are terminated in success + if stopped == succeeded { + return v1.PodSucceeded + } + // There is at least one failure + return v1.PodFailed + } + } // All containers are terminated if spec.RestartPolicy == v1.RestartPolicyAlways { // All containers are in the process of restarting @@ -1567,8 +1524,8 @@ func (kl *Kubelet) determinePodResizeStatus(pod *v1.Pod, podStatus *v1.PodStatus // generateAPIPodStatus creates the final API pod status for a pod, given the // internal pod status. This method should only be called from within sync*Pod methods. -func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { - klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) +func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podIsTerminal bool) v1.PodStatus { + klog.V(3).InfoS("Generating pod status", "podIsTerminal", podIsTerminal, "pod", klog.KObj(pod)) // use the previous pod status, or the api status, as the basis for this pod oldPodStatus, found := kl.statusManager.GetPodStatus(pod.UID) if !found { @@ -1580,7 +1537,7 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po } // calculate the next phase and preserve reason allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...) - s.Phase = getPhase(&pod.Spec, allStatus) + s.Phase = getPhase(pod, allStatus, podIsTerminal) klog.V(4).InfoS("Got phase for pod", "pod", klog.KObj(pod), "oldPhase", oldPodStatus.Phase, "phase", s.Phase) // Perform a three-way merge between the statuses from the status manager, diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 2f1c2fa94b4..6d8b622d21d 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -2152,11 +2152,17 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { } tests := []struct { - pod *v1.Pod - status v1.PodPhase - test string + pod *v1.Pod + podIsTerminal bool + status v1.PodPhase + test string }{ - {&v1.Pod{Spec: desiredState, Status: v1.PodStatus{}}, v1.PodPending, "waiting"}, + { + &v1.Pod{Spec: desiredState, Status: v1.PodStatus{}}, + false, + v1.PodPending, + "waiting", + }, { &v1.Pod{ Spec: desiredState, @@ -2167,6 +2173,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodRunning, "all running", }, @@ -2180,9 +2187,38 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodRunning, "all stopped with restart always", }, + { + &v1.Pod{ + Spec: desiredState, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + succeededState("containerA"), + succeededState("containerB"), + }, + }, + }, + true, + v1.PodSucceeded, + "all succeeded with restart always, but the pod is terminal", + }, + { + &v1.Pod{ + Spec: desiredState, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + succeededState("containerA"), + failedState("containerB"), + }, + }, + }, + true, + v1.PodFailed, + "all stopped with restart always, but the pod is terminal", + }, { &v1.Pod{ Spec: desiredState, @@ -2193,6 +2229,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodRunning, "mixed state #1 with restart always", }, @@ -2205,6 +2242,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodPending, "mixed state #2 with restart always", }, @@ -2218,6 +2256,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodPending, "mixed state #3 with restart always", }, @@ -2231,12 +2270,13 @@ func TestPodPhaseWithRestartAlways(t *testing.T) { }, }, }, + false, v1.PodRunning, "backoff crashloop container with restart always", }, } for _, test := range tests { - status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) + status := getPhase(test.pod, test.pod.Status.ContainerStatuses, test.podIsTerminal) assert.Equal(t, test.status, status, "[test %s]", test.test) } } @@ -2339,7 +2379,7 @@ func TestPodPhaseWithRestartAlwaysInitContainers(t *testing.T) { } for _, test := range tests { statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...) - status := getPhase(&test.pod.Spec, statusInfo) + status := getPhase(test.pod, statusInfo, false) assert.Equal(t, test.status, status, "[test %s]", test.test) } } @@ -2439,7 +2479,7 @@ func TestPodPhaseWithRestartNever(t *testing.T) { }, } for _, test := range tests { - status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) + status := getPhase(test.pod, test.pod.Status.ContainerStatuses, false) assert.Equal(t, test.status, status, "[test %s]", test.test) } } @@ -2542,7 +2582,7 @@ func TestPodPhaseWithRestartNeverInitContainers(t *testing.T) { } for _, test := range tests { statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...) - status := getPhase(&test.pod.Spec, statusInfo) + status := getPhase(test.pod, statusInfo, false) assert.Equal(t, test.status, status, "[test %s]", test.test) } } @@ -2655,7 +2695,7 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) { }, } for _, test := range tests { - status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) + status := getPhase(test.pod, test.pod.Status.ContainerStatuses, false) assert.Equal(t, test.status, status, "[test %s]", test.test) } } @@ -2785,13 +2825,14 @@ func Test_generateAPIPodStatus(t *testing.T) { currentStatus *kubecontainer.PodStatus unreadyContainer []string previousStatus v1.PodStatus + isPodTerminal bool enablePodDisruptionConditions bool expected v1.PodStatus expectedPodDisruptionCondition v1.PodCondition expectedPodHasNetworkCondition v1.PodCondition }{ { - name: "pod disruption condition is copied over; PodDisruptionConditions enabled", + name: "pod disruption condition is copied over and the phase is set to failed when deleted; PodDisruptionConditions enabled", pod: &v1.Pod{ Spec: desiredState, Status: v1.PodStatus{ @@ -2819,15 +2860,16 @@ func Test_generateAPIPodStatus(t *testing.T) { LastTransitionTime: normalized_now, }}, }, + isPodTerminal: true, enablePodDisruptionConditions: true, expected: v1.PodStatus{ - Phase: v1.PodRunning, + Phase: v1.PodFailed, HostIP: "127.0.0.1", QOSClass: v1.PodQOSBestEffort, Conditions: []v1.PodCondition{ {Type: v1.PodInitialized, Status: v1.ConditionTrue}, - {Type: v1.PodReady, Status: v1.ConditionTrue}, - {Type: v1.ContainersReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse, Reason: "PodFailed"}, + {Type: v1.ContainersReady, Status: v1.ConditionFalse, Reason: "PodFailed"}, {Type: v1.PodScheduled, Status: v1.ConditionTrue}, }, ContainerStatuses: []v1.ContainerStatus{ @@ -3223,7 +3265,7 @@ func Test_generateAPIPodStatus(t *testing.T) { kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) } expected := test.expected.DeepCopy() - actual := kl.generateAPIPodStatus(test.pod, test.currentStatus) + actual := kl.generateAPIPodStatus(test.pod, test.currentStatus, test.isPodTerminal) if enablePodHasNetworkCondition { expected.Conditions = append([]v1.PodCondition{test.expectedPodHasNetworkCondition}, expected.Conditions...) } @@ -3761,7 +3803,7 @@ func TestGenerateAPIPodStatusHostNetworkPodIPs(t *testing.T) { IPs: tc.criPodIPs, } - status := kl.generateAPIPodStatus(pod, criStatus) + status := kl.generateAPIPodStatus(pod, criStatus, false) if !reflect.DeepEqual(status.PodIPs, tc.podIPs) { t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs) } @@ -3874,7 +3916,7 @@ func TestNodeAddressUpdatesGenerateAPIPodStatusHostNetworkPodIPs(t *testing.T) { } podStatus.IPs = tc.nodeIPs - status := kl.generateAPIPodStatus(pod, podStatus) + status := kl.generateAPIPodStatus(pod, podStatus, false) if !reflect.DeepEqual(status.PodIPs, tc.expectedPodIPs) { t.Fatalf("Expected PodIPs %#v, got %#v", tc.expectedPodIPs, status.PodIPs) } @@ -4007,7 +4049,7 @@ func TestGenerateAPIPodStatusPodIPs(t *testing.T) { IPs: tc.criPodIPs, } - status := kl.generateAPIPodStatus(pod, criStatus) + status := kl.generateAPIPodStatus(pod, criStatus, false) if !reflect.DeepEqual(status.PodIPs, tc.podIPs) { t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index c0f998177a4..8d50c9ec189 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1989,7 +1989,7 @@ func TestGenerateAPIPodStatusWithSortedContainers(t *testing.T) { ContainerStatuses: cStatuses, } for i := 0; i < 5; i++ { - apiStatus := kubelet.generateAPIPodStatus(pod, status) + apiStatus := kubelet.generateAPIPodStatus(pod, status, false) for i, c := range apiStatus.ContainerStatuses { if expectedOrder[i] != c.Name { t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name) @@ -2203,7 +2203,7 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) { pod.Spec.Containers = test.containers pod.Status.ContainerStatuses = test.oldStatuses podStatus.ContainerStatuses = test.statuses - apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) + apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false) verifyContainerStatuses(t, apiStatus.ContainerStatuses, test.expectedState, test.expectedLastTerminationState, fmt.Sprintf("case %d", i)) } @@ -2216,7 +2216,7 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) { pod.Spec.InitContainers = test.containers pod.Status.InitContainerStatuses = test.oldStatuses podStatus.ContainerStatuses = test.statuses - apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) + apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false) expectedState := test.expectedState if test.expectedInitState != nil { expectedState = test.expectedInitState @@ -2355,14 +2355,14 @@ func TestGenerateAPIPodStatusWithDifferentRestartPolicies(t *testing.T) { pod.Spec.RestartPolicy = test.restartPolicy // Test normal containers pod.Spec.Containers = containers - apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) + apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false) expectedState, expectedLastTerminationState := test.expectedState, test.expectedLastTerminationState verifyContainerStatuses(t, apiStatus.ContainerStatuses, expectedState, expectedLastTerminationState, fmt.Sprintf("case %d", c)) pod.Spec.Containers = nil // Test init containers pod.Spec.InitContainers = containers - apiStatus = kubelet.generateAPIPodStatus(pod, podStatus) + apiStatus = kubelet.generateAPIPodStatus(pod, podStatus, false) if test.expectedInitState != nil { expectedState = test.expectedInitState } @@ -2656,7 +2656,7 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) { Name: pod.Name, Namespace: pod.Namespace, } - apiStatus := kubelet.generateAPIPodStatus(pod, status) + apiStatus := kubelet.generateAPIPodStatus(pod, status, false) require.Equal(t, v1.PodFailed, apiStatus.Phase) require.Equal(t, "Evicted", apiStatus.Reason) require.Equal(t, "because", apiStatus.Message) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 5c4042a4234..8d72e1ad55e 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -159,7 +159,8 @@ type PodWorkers interface { // returned as knownPods. SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) - // IsPodKnownTerminated returns true if the provided pod UID is known by the pod + // IsPodKnownTerminated returns true once SyncTerminatingPod completes + // successfully - the provided pod UID it is known by the pod // worker to be terminated. If the pod has been force deleted and the pod worker // has completed termination this method will return false, so this method should // only be used to filter out pods from the desired set such as in admission. @@ -174,6 +175,19 @@ type PodWorkers interface { // Intended for use by the kubelet config loops, but not subsystems, which should // use ShouldPod*(). CouldHaveRunningContainers(uid types.UID) bool + + // ShouldPodBeFinished returns true once SyncTerminatedPod completes + // successfully - the provided pod UID it is known to the pod worker to + // be terminated and have resources reclaimed. It returns false before the + // pod workers have synced (syncPod could be called). Once the pod workers + // have synced it returns false if the pod has a sync status until + // SyncTerminatedPod completes successfully. If the pod workers have synced, + // but the pod does not have a status it returns true. + // + // Intended for use by subsystem sync loops to avoid performing background setup + // after termination has been requested for a pod. Callers must ensure that the + // syncPod method is non-blocking when their data is absent. + ShouldPodBeFinished(uid types.UID) bool // IsPodTerminationRequested returns true when pod termination has been requested // until the termination completes and the pod is removed from config. This should // not be used in cleanup loops because it will return false if the pod has already @@ -521,7 +535,7 @@ func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) { // . ^ other loops can tear down // . . // -// ------------------------------------| |---- = status manager is waiting for PodResourcesAreReclaimed() +// ------------------------------------| |---- = status manager is waiting for SyncTerminatedPod() finished // // . ^ . // @@ -625,6 +639,17 @@ func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool { return !p.podsSynced } +func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool { + p.podLock.Lock() + defer p.podLock.Unlock() + if status, ok := p.podSyncStatuses[uid]; ok { + return status.IsFinished() + } + // once all pods are synced, any pod without sync status is assumed to + // have SyncTerminatedPod finished. + return p.podsSynced +} + func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 976cd25bdb6..3fcd2465c14 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -56,6 +56,7 @@ type fakePodWorkers struct { terminating map[types.UID]bool terminated map[types.UID]bool terminationRequested map[types.UID]bool + finished map[types.UID]bool removeRuntime map[types.UID]bool removeContent map[types.UID]bool terminatingStaticPods map[string]bool @@ -105,6 +106,11 @@ func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool { defer f.statusLock.Unlock() return f.running[uid] } +func (f *fakePodWorkers) ShouldPodBeFinished(uid types.UID) bool { + f.statusLock.Lock() + defer f.statusLock.Unlock() + return f.finished[uid] +} func (f *fakePodWorkers) IsPodTerminationRequested(uid types.UID) bool { f.statusLock.Lock() defer f.statusLock.Unlock() diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index fa975c9490d..728b438a465 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -60,12 +60,10 @@ type versionedPodStatus struct { // at is the time at which the most recent status update was detected at time.Time - status v1.PodStatus -} + // True if the status is generated at the end of SyncTerminatedPod, or after it is completed. + podIsFinished bool -type podStatusSyncRequest struct { - podUID types.UID - status versionedPodStatus + status v1.PodStatus } // Updates pod statuses in apiserver. Writes only when new status has changed. @@ -76,7 +74,7 @@ type manager struct { // Map from pod UID to sync status of the corresponding pod. podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex - podStatusChannel chan podStatusSyncRequest + podStatusChannel chan struct{} // Map from (mirror) pod UID to latest status version successfully sent to the API server. // apiStatusVersions must only be accessed from the sync thread. apiStatusVersions map[kubetypes.MirrorPodUID]uint64 @@ -99,8 +97,6 @@ type PodStatusProvider interface { // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. type PodDeletionSafetyProvider interface { - // PodResourcesAreReclaimed returns true if the pod can safely be deleted. - PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool // PodCouldHaveRunningContainers returns true if the pod could have running containers. PodCouldHaveRunningContainers(pod *v1.Pod) bool } @@ -158,7 +154,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD kubeClient: kubeClient, podManager: podManager, podStatuses: make(map[types.UID]versionedPodStatus), - podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses + podStatusChannel: make(chan struct{}, 1), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), podDeletionSafety: podDeletionSafety, podStartupLatencyHelper: podStartupLatencyHelper, @@ -217,19 +213,12 @@ func (m *manager) Start() { go wait.Forever(func() { for { select { - case syncRequest := <-m.podStatusChannel: - klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel", - "podUID", syncRequest.podUID, - "statusVersion", syncRequest.status.version, - "status", syncRequest.status.status) - m.syncPod(syncRequest.podUID, syncRequest.status) + case <-m.podStatusChannel: + klog.V(4).InfoS("Syncing updated statuses") + m.syncBatch(false) case <-syncTicker: - klog.V(5).InfoS("Status Manager: syncing batch") - // remove any entries in the status channel since the batch will handle them - for i := len(m.podStatusChannel); i > 0; i-- { - <-m.podStatusChannel - } - m.syncBatch() + klog.V(4).InfoS("Syncing all statuses") + m.syncBatch(true) } } }, 0) @@ -291,7 +280,7 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { // Force a status update if deletion timestamp is set. This is necessary // because if the pod is in the non-running state, the pod worker still // needs to be able to trigger an update and/or deletion. - m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) + m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false) } func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -352,7 +341,7 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai } updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) - m.updateStatusInternal(pod, status, false) + m.updateStatusInternal(pod, status, false, false) } func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) { @@ -394,7 +383,7 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Started = &started - m.updateStatusInternal(pod, status, false) + m.updateStatusInternal(pod, status, false, false) } func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { @@ -423,6 +412,8 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta // the pod as successful. If we have not yet initialized the pod in the presence of init containers, // the init container failure status is sufficient to describe the pod as failing, and we do not need // to override waiting containers (unless there is evidence the pod previously started those containers). +// It also makes sure that pods are transitioned to a terminal phase (Failed or Succeeded) before +// their deletion. func (m *manager) TerminatePod(pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() @@ -430,7 +421,8 @@ func (m *manager) TerminatePod(pod *v1.Pod) { // ensure that all containers have a terminated state - because we do not know whether the container // was successful, always report an error oldStatus := &pod.Status - if cachedStatus, ok := m.podStatuses[pod.UID]; ok { + cachedStatus, isCached := m.podStatuses[pod.UID] + if isCached { oldStatus = &cachedStatus.status } status := *oldStatus.DeepCopy() @@ -466,8 +458,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) { } } + // Make sure all pods are transitioned to a terminal phase. + // TODO(#116484): Also assign terminal phase to static an pods. + if !kubetypes.IsStaticPod(pod) { + switch status.Phase { + case v1.PodSucceeded, v1.PodFailed: + // do nothing, already terminal + case v1.PodPending, v1.PodRunning: + if status.Phase == v1.PodRunning && isCached { + klog.InfoS("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + } + klog.V(3).InfoS("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + status.Phase = v1.PodFailed + default: + klog.ErrorS(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + status.Phase = v1.PodFailed + } + } + klog.V(5).InfoS("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) - m.updateStatusInternal(pod, status, true) + m.updateStatusInternal(pod, status, true, true) } // hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which @@ -540,13 +550,20 @@ func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus } // updateStatusInternal updates the internal status cache, and queues an update to the api server if -// necessary. Returns whether an update was triggered. +// necessary. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool { +func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) { var oldStatus v1.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { oldStatus = cachedStatus.status + // TODO(#116484): Also assign terminal phase to static pods. + if !kubetypes.IsStaticPod(pod) { + if cachedStatus.podIsFinished && !podIsFinished { + klog.InfoS("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error.", "pod", klog.KObj(pod)) + podIsFinished = true + } + } } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status } else { @@ -556,11 +573,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // Check for illegal state transition in containers if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) - return false + return } if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) - return false + return } // Set ContainersReadyCondition.LastTransitionTime. @@ -623,21 +640,22 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous)) } sort.Strings(containers) - klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) + klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) } // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) - return false // No new status. + return } newStatus := versionedPodStatus{ - status: status, - version: cachedStatus.version + 1, - podName: pod.Name, - podNamespace: pod.Namespace, + status: status, + version: cachedStatus.version + 1, + podName: pod.Name, + podNamespace: pod.Namespace, + podIsFinished: podIsFinished, } // Multiple status updates can be generated before we update the API server, @@ -652,20 +670,9 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp m.podStatuses[pod.UID] = newStatus select { - case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: - klog.V(5).InfoS("Status Manager: adding pod with new status to podStatusChannel", - "pod", klog.KObj(pod), - "podUID", pod.UID, - "statusVersion", newStatus.version, - "status", newStatus.status) - return true + case m.podStatusChannel <- struct{}{}: default: - // Let the periodic syncBatch handle the update if the channel is full. - // We can't block, since we hold the mutex lock. - klog.V(4).InfoS("Skipping the status update for pod for now because the channel is full", - "pod", klog.KObj(pod), - "status", status) - return false + // there's already a status update pending } } @@ -710,25 +717,38 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { } } -// syncBatch syncs pods statuses with the apiserver. -func (m *manager) syncBatch() { - var updatedStatuses []podStatusSyncRequest +// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs +// attempted for testing. +func (m *manager) syncBatch(all bool) int { + type podSync struct { + podUID types.UID + statusUID kubetypes.MirrorPodUID + status versionedPodStatus + } + + var updatedStatuses []podSync podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() func() { // Critical section m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() // Clean up orphaned versions. - for uid := range m.apiStatusVersions { - _, hasPod := m.podStatuses[types.UID(uid)] - _, hasMirror := mirrorToPod[uid] - if !hasPod && !hasMirror { - delete(m.apiStatusVersions, uid) + if all { + for uid := range m.apiStatusVersions { + _, hasPod := m.podStatuses[types.UID(uid)] + _, hasMirror := mirrorToPod[uid] + if !hasPod && !hasMirror { + delete(m.apiStatusVersions, uid) + } } } + // Decide which pods need status updates. for uid, status := range m.podStatuses { - syncedUID := kubetypes.MirrorPodUID(uid) + // translate the pod UID (source) to the status UID (API pod) - + // static pods are identified in source by pod UID but tracked in the + // API via the uid of the mirror pod + uidOfStatus := kubetypes.MirrorPodUID(uid) if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID == "" { klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping", @@ -736,34 +756,45 @@ func (m *manager) syncBatch() { "pod", klog.KRef(status.podNamespace, status.podName)) continue } - syncedUID = mirrorUID + uidOfStatus = mirrorUID } - if m.needsUpdate(types.UID(syncedUID), status) { - updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + + // if a new status update has been delivered, trigger an update, otherwise the + // pod can wait for the next bulk check (which performs reconciliation as well) + if !all { + if m.apiStatusVersions[uidOfStatus] >= status.version { + continue + } + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) + continue + } + + // Ensure that any new status, or mismatched status, or pod that is ready for + // deletion gets updated. If a status update fails we retry the next time any + // other pod is updated. + if m.needsUpdate(types.UID(uidOfStatus), status) { + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) } else if m.needsReconcile(uid, status.status) { // Delete the apiStatusVersions here to force an update on the pod status // In most cases the deleted apiStatusVersions here should be filled // soon after the following syncPod() [If the syncPod() sync an update // successfully]. - delete(m.apiStatusVersions, syncedUID) - updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + delete(m.apiStatusVersions, uidOfStatus) + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) } } }() for _, update := range updatedStatuses { - klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID) + klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version) m.syncPod(update.podUID, update.status) } + + return len(updatedStatuses) } -// syncPod syncs the given status with the API server. The caller must not hold the lock. +// syncPod syncs the given status with the API server. The caller must not hold the status lock. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { - if !m.needsUpdate(uid, status) { - klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid) - return - } - // TODO: make me easier to express from client code pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { @@ -815,14 +846,14 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if status.at.IsZero() { klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) } else { - duration := time.Now().Sub(status.at).Truncate(time.Millisecond) + duration := time.Since(status.at).Truncate(time.Millisecond) metrics.PodStatusSyncDuration.Observe(duration.Seconds()) } m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version // We don't handle graceful deletion of mirror pods. - if m.canBeDeleted(pod, status.status) { + if m.canBeDeleted(pod, status.status, status.podIsFinished) { deleteOptions := metav1.DeleteOptions{ GracePeriodSeconds: new(int64), // Use the pod UID as the precondition for deletion to prevent deleting a @@ -850,14 +881,24 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { if !ok { return false } - return m.canBeDeleted(pod, status.status) + return m.canBeDeleted(pod, status.status, status.podIsFinished) } -func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool { +func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool { if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) { return false } - return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status) + // Delay deletion of pods until the phase is terminal. + if !podutil.IsPodPhaseTerminal(pod.Status.Phase) { + klog.V(3).InfoS("Delaying pod deletion as the phase is non-terminal", "phase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + return false + } + // If this is an update completing pod termination then we know the pod termination is finished. + if podIsFinished { + klog.V(3).InfoS("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + return true + } + return false } // needsReconcile compares the given status with the status in the pod manager (which diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 82b8e9376e3..443c7d829cc 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -69,7 +70,7 @@ func getTestPod() *v1.Pod { // After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation // will be triggered, which will mess up all the old unit test. // To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in -// pod manager the same with cached ones before syncBatch() so as to avoid reconciling. +// pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling. func (m *manager) testSyncBatch() { for uid, status := range m.podStatuses { pod, ok := m.podManager.GetPodByUID(uid) @@ -81,7 +82,7 @@ func (m *manager) testSyncBatch() { pod.Status = status.status } } - m.syncBatch() + m.syncBatch(true) } func newTestManager(kubeClient clientset.Interface) *manager { @@ -113,19 +114,19 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action actions := manager.kubeClient.(*fake.Clientset).Actions() defer manager.kubeClient.(*fake.Clientset).ClearActions() if len(actions) != len(expectedActions) { - t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) - return + t.Fatalf("unexpected actions: %s", cmp.Diff(expectedActions, actions)) } for i := 0; i < len(actions); i++ { e := expectedActions[i] a := actions[i] if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() { - t.Errorf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) + t.Errorf("unexpected actions: %s", cmp.Diff(expectedActions, actions)) } } } func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { + t.Helper() // Consume all updates in the channel. numUpdates := manager.consumeUpdates() if numUpdates != expectedUpdates { @@ -137,9 +138,8 @@ func (m *manager) consumeUpdates() int { updates := 0 for { select { - case syncRequest := <-m.podStatusChannel: - m.syncPod(syncRequest.podUID, syncRequest.status) - updates++ + case <-m.podStatusChannel: + updates += m.syncBatch(false) default: return updates } @@ -214,8 +214,9 @@ func TestChangedStatus(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() syncer.SetPodStatus(testPod, getRandomPodStatus()) + verifyUpdates(t, syncer, 1) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyUpdates(t, syncer, 2) + verifyUpdates(t, syncer, 1) } func TestChangedStatusKeepsStartTime(t *testing.T) { @@ -225,8 +226,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { firstStatus := getRandomPodStatus() firstStatus.StartTime = &now syncer.SetPodStatus(testPod, firstStatus) + verifyUpdates(t, syncer, 1) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyUpdates(t, syncer, 2) + verifyUpdates(t, syncer, 1) finalStatus := expectPodStatus(t, syncer, testPod) if finalStatus.StartTime.IsZero() { t.Errorf("StartTime should not be zero") @@ -407,9 +409,9 @@ func TestStaleUpdates(t *testing.T) { status.Message = "second version bump" m.SetPodStatus(pod, status) - t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") - m.syncBatch() - verifyUpdates(t, m, 3) + t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch") + m.syncBatch(true) + verifyUpdates(t, m, 0) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) @@ -423,7 +425,7 @@ func TestStaleUpdates(t *testing.T) { m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.SetPodStatus(pod, status) - m.syncBatch() + m.syncBatch(true) verifyActions(t, m, []core.Action{getAction()}) t.Logf("Nothing stuck in the pipe.") @@ -545,7 +547,7 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") - m.syncBatch() + m.syncBatch(true) assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) t.Logf("Create the mirror pod") @@ -558,6 +560,7 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should sync pod because the corresponding mirror pod is created") + assert.Equal(t, m.syncBatch(true), 1) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("syncBatch should not sync any pods because nothing is changed.") @@ -571,7 +574,7 @@ func TestStaticPod(t *testing.T) { m.podManager.AddPod(mirrorPod) t.Logf("Should not update to mirror pod, because UID has changed.") - m.syncBatch() + assert.Equal(t, m.syncBatch(true), 1) verifyActions(t, m, []core.Action{getAction()}) } @@ -745,13 +748,25 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { expectFn func(t *testing.T, status v1.PodStatus) }{ {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })}, - {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })}, - {pod: newPod(0, 1, func(pod *v1.Pod) { - pod.Status.Phase = v1.PodRunning - pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, - } - })}, + { + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodRunning + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + status.Phase = v1.PodFailed + }, + }, + { + pod: newPod(0, 1, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodRunning + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + {Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, + } + }), + expectFn: func(t *testing.T, status v1.PodStatus) { + status.Phase = v1.PodFailed + }, + }, { name: "last termination state set", pod: newPod(0, 1, func(pod *v1.Pod) { @@ -997,6 +1012,78 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { } } +func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) { + testCases := map[string]struct { + enablePodDisruptionConditions bool + status v1.PodStatus + wantStatus v1.PodStatus + }{ + "Pending pod": { + status: v1.PodStatus{ + Phase: v1.PodPending, + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + "Running pod": { + status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + "Succeeded pod": { + status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + "Failed pod": { + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + "Unknown pod": { + status: v1.PodStatus{ + Phase: v1.PodUnknown, + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + "Unknown phase pod": { + status: v1.PodStatus{ + Phase: v1.PodPhase("SomeUnknownPhase"), + }, + wantStatus: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podStartupLatencyTracker := util.NewPodStartupLatencyTracker() + syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) + + pod := getTestPod() + pod.Status = tc.status + syncer.TerminatePod(pod) + gotStatus := expectPodStatus(t, syncer, pod.DeepCopy()) + if diff := cmp.Diff(tc.wantStatus, gotStatus, cmpopts.IgnoreFields(v1.PodStatus{}, "StartTime")); diff != "" { + t.Fatalf("unexpected status: %s", diff) + } + }) + } +} + func TestSetContainerReadiness(t *testing.T) { cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} @@ -1184,7 +1271,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Logf("Orphaned pods should be removed.") m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 - m.syncBatch() + m.syncBatch(true) if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { t.Errorf("Should have cleared status for testPod") } @@ -1216,7 +1303,7 @@ func TestReconcilePodStatus(t *testing.T) { syncer := newTestManager(client) syncer.SetPodStatus(testPod, getRandomPodStatus()) t.Logf("Call syncBatch directly to test reconcile") - syncer.syncBatch() // The apiStatusVersions should be set now + syncer.syncBatch(true) // The apiStatusVersions should be set now client.ClearActions() podStatus, ok := syncer.GetPodStatus(testPod.UID) @@ -1231,7 +1318,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status is the same, a reconciliation is not needed") } syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{}) // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), @@ -1246,7 +1333,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{}) t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") @@ -1256,7 +1343,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status is different, a reconciliation is needed") } syncer.SetPodStatus(testPod, changedPodStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } @@ -1268,36 +1355,32 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { return status } -func TestDeletePods(t *testing.T) { +func TestDeletePodBeforeFinished(t *testing.T) { pod := getTestPod() t.Logf("Set the deletion timestamp.") pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true m.podManager.AddPod(pod) status := getRandomPodStatus() - now := metav1.Now() - status.StartTime = &now + status.Phase = v1.PodFailed m.SetPodStatus(pod, status) - t.Logf("Expect to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) + t.Logf("Expect not to see a delete action as the pod isn't finished yet (TerminatePod isn't called)") + verifyActions(t, m, []core.Action{getAction(), patchAction()}) } -func TestDeletePodWhileReclaiming(t *testing.T) { +func TestDeletePodFinished(t *testing.T) { pod := getTestPod() t.Logf("Set the deletion timestamp.") pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false m.podManager.AddPod(pod) status := getRandomPodStatus() - now := metav1.Now() - status.StartTime = &now - m.SetPodStatus(pod, status) - t.Logf("Expect to see a delete action.") - verifyActions(t, m, []core.Action{getAction(), patchAction()}) + status.Phase = v1.PodFailed + m.TerminatePod(pod) + t.Logf("Expect to see a delete action as the pod is finished (TerminatePod called)") + verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } func TestDoNotDeleteMirrorPods(t *testing.T) { diff --git a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go index 98c3b226c0b..18e8b8fb7e4 100644 --- a/pkg/kubelet/status/testing/fake_pod_deletion_safety.go +++ b/pkg/kubelet/status/testing/fake_pod_deletion_safety.go @@ -20,14 +20,9 @@ import v1 "k8s.io/api/core/v1" // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. type FakePodDeletionSafetyProvider struct { - Reclaimed bool HasRunning bool } -func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - return f.Reclaimed -} - func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { return f.HasRunning } diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index 2de1bee1691..d88464f9746 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -104,20 +104,6 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContaine return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod) } -// PodResourcesAreReclaimed mocks base method. -func (m *MockPodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PodResourcesAreReclaimed", pod, status) - ret0, _ := ret[0].(bool) - return ret0 -} - -// PodResourcesAreReclaimed indicates an expected call of PodResourcesAreReclaimed. -func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(pod, status interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status) -} - // MockPodStartupLatencyStateHelper is a mock of PodStartupLatencyStateHelper interface. type MockPodStartupLatencyStateHelper struct { ctrl *gomock.Controller diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index a50340375ac..e462cc0bc3b 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -401,7 +401,7 @@ func WaitForPodTerminatingInNamespaceTimeout(ctx context.Context, c clientset.In // WaitForPodSuccessInNamespaceTimeout returns nil if the pod reached state success, or an error if it reached failure or ran too long. func WaitForPodSuccessInNamespaceTimeout(ctx context.Context, c clientset.Interface, podName, namespace string, timeout time.Duration) error { return WaitForPodCondition(ctx, c, namespace, podName, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout, func(pod *v1.Pod) (bool, error) { - if pod.Spec.RestartPolicy == v1.RestartPolicyAlways { + if pod.DeletionTimestamp == nil && pod.Spec.RestartPolicy == v1.RestartPolicyAlways { return true, fmt.Errorf("pod %q will never terminate with a succeeded state since its restart policy is Always", podName) } switch pod.Status.Phase { @@ -728,6 +728,13 @@ func WaitForPodContainerToFail(ctx context.Context, c clientset.Interface, names }) } +// WaitForPodScheduled waits for the pod to be schedule, ie. the .spec.nodeName is set +func WaitForPodScheduled(ctx context.Context, c clientset.Interface, namespace, podName string) error { + return WaitForPodCondition(ctx, c, namespace, podName, "pod is scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) { + return pod.Spec.NodeName != "", nil + }) +} + // WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe. func WaitForPodContainerStarted(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error { conditionDesc := fmt.Sprintf("container %d started", containerIndex) diff --git a/test/e2e_node/deleted_pods_test.go b/test/e2e_node/deleted_pods_test.go new file mode 100644 index 00000000000..4541a5b3264 --- /dev/null +++ b/test/e2e_node/deleted_pods_test.go @@ -0,0 +1,207 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + imageutils "k8s.io/kubernetes/test/utils/image" + admissionapi "k8s.io/pod-security-admission/api" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" +) + +const ( + testFinalizer = "example.com/test-finalizer" +) + +var _ = SIGDescribe("Deleted pods handling [NodeConformance]", func() { + f := framework.NewDefaultFramework("deleted-pods-test") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline + + ginkgo.It("Should transition to Failed phase a pod which is deleted while pending", func(ctx context.Context) { + podName := "deleted-pending-" + string(uuid.NewUUID()) + podSpec := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Finalizers: []string{testFinalizer}, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + Containers: []v1.Container{ + { + Name: podName, + Image: "non-existing-repo/non-existing-image:v1.0", + ImagePullPolicy: "Always", + Command: []string{"bash"}, + Args: []string{"-c", `echo "Hello world"`}, + }, + }, + }, + } + ginkgo.By("creating the pod with invalid image reference and finalizer") + pod := e2epod.NewPodClient(f).Create(ctx, podSpec) + + ginkgo.By("set up cleanup of the finalizer") + ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer) + + ginkgo.By("Waiting for the pod to be scheduled so that kubelet owns it") + err := e2epod.WaitForPodScheduled(ctx, f.ClientSet, pod.Namespace, pod.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be scheduled: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name)) + err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned into the Failed phase", pod.Namespace, pod.Name)) + err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, pod.Name, "", f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be terminated: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Fetch the end state of the pod (%v/%v)", pod.Namespace, pod.Name)) + pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name) + }) + + ginkgo.DescribeTable("Should transition to Failed phase a deleted pod if non-zero exit codes", + func(ctx context.Context, policy v1.RestartPolicy) { + podName := "deleted-running-" + strings.ToLower(string(policy)) + "-" + string(uuid.NewUUID()) + podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Finalizers: []string{testFinalizer}, + }, + Spec: v1.PodSpec{ + RestartPolicy: policy, + Containers: []v1.Container{ + { + Name: podName, + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{"sleep", "1800"}, + }, + }, + }, + }) + ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podSpec.Name, podSpec.Spec.RestartPolicy)) + pod := e2epod.NewPodClient(f).Create(ctx, podSpec) + + ginkgo.By("set up cleanup of the finalizer") + ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer) + + ginkgo.By("Waiting for the pod to be running") + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be running: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name)) + err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(1)) + framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned to the failed phase", pod.Namespace, pod.Name)) + err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, pod.Name, "", f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be terminated: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Fetching the end state of the pod (%v/%v)", pod.Namespace, pod.Name)) + pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container is in the terminated state", pod.Namespace, pod.Name)) + gomega.Expect(pod.Status.ContainerStatuses).Should(gomega.HaveLen(1)) + containerStatus := pod.Status.ContainerStatuses[0] + gomega.Expect(containerStatus.State.Terminated).ToNot(gomega.BeNil(), "The pod container is in not in the Terminated state") + + ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container exit code is 137", pod.Namespace, pod.Name)) + gomega.Expect(containerStatus.State.Terminated.ExitCode).Should(gomega.Equal(int32(137))) + }, + ginkgo.Entry("Restart policy Always", v1.RestartPolicyAlways), + ginkgo.Entry("Restart policy OnFailure", v1.RestartPolicyOnFailure), + ginkgo.Entry("Restart policy Never", v1.RestartPolicyNever), + ) + + ginkgo.DescribeTable("Should transition to Succeeded phase a deleted pod when containers complete with 0 exit code", + func(ctx context.Context, policy v1.RestartPolicy) { + podName := "deleted-running-" + strings.ToLower(string(policy)) + "-" + string(uuid.NewUUID()) + podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Finalizers: []string{testFinalizer}, + }, + Spec: v1.PodSpec{ + RestartPolicy: policy, + Containers: []v1.Container{ + { + Name: podName, + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{"sh", "-c"}, + Args: []string{` + _term() { + echo "Caught SIGTERM signal!" + exit 0 + } + trap _term SIGTERM + while true; do sleep 5; done + `, + }, + }, + }, + }, + }) + ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podSpec.Name, podSpec.Spec.RestartPolicy)) + pod := e2epod.NewPodClient(f).Create(ctx, podSpec) + + ginkgo.By("set up cleanup of the finalizer") + ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer) + + ginkgo.By("Waiting for the pod to be running") + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be running: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name)) + // wait a little bit to make sure the we are inside the while and that the trap is registered + time.Sleep(time.Second) + err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned to the succeeded phase", pod.Namespace, pod.Name)) + err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be succeeded: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Fetching the end state of the pod (%v/%v)", pod.Namespace, pod.Name)) + pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name) + + ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container is in the terminated state", pod.Namespace, pod.Name)) + gomega.Expect(pod.Status.ContainerStatuses).Should(gomega.HaveLen(1)) + containerStatus := pod.Status.ContainerStatuses[0] + gomega.Expect(containerStatus.State.Terminated).ShouldNot(gomega.BeNil(), "The pod container is in not in the Terminated state") + + ginkgo.By(fmt.Sprintf("Verifying the exit code for the terminated container is 0 for pod (%v/%v)", pod.Namespace, pod.Name)) + gomega.Expect(containerStatus.State.Terminated.ExitCode).Should(gomega.Equal(int32(0))) + }, + ginkgo.Entry("Restart policy Always", v1.RestartPolicyAlways), + ginkgo.Entry("Restart policy OnFailure", v1.RestartPolicyOnFailure), + ginkgo.Entry("Restart policy Never", v1.RestartPolicyNever), + ) + +})