Merge pull request #115331 from mimowo/kubelet-fail-pending-deleted-pods

Give terminal phase correctly to all pods that will not be restarted
This commit is contained in:
Kubernetes Prow Robot 2023-03-16 16:47:17 -07:00 committed by GitHub
commit e1c2af399a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 604 additions and 233 deletions

View File

@ -1479,7 +1479,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1) os.Exit(1)
} }
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod) kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod)
// container log manager must start after container runtime is up to retrieve information from container runtime // container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation. // and inform container to reopen log file after log rotation.
@ -1689,7 +1689,7 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
} }
// Generate final API pod status with pod and status manager status // Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus // set pod IP to hostIP directly in runtime.GetPodStatus
@ -1941,7 +1941,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
if podStatusFn != nil { if podStatusFn != nil {
podStatusFn(&apiPodStatus) podStatusFn(&apiPodStatus)
} }
@ -2014,6 +2014,13 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
} }
} }
// Compute and update the status in cache once the pods are no longer running.
// The computation is done here to ensure the pod status used for it contains
// information about the container end states (including exit codes) - when
// SyncTerminatedPod is called the containers may already be removed.
apiPodStatus = kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we have successfully stopped all containers, the pod is terminating, our status is "done" // we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
@ -2050,8 +2057,8 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube
} }
// SyncTerminatedPod cleans up a pod that has terminated (has no running containers). // SyncTerminatedPod cleans up a pod that has terminated (has no running containers).
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which // The invocations in this call are expected to tear down all pod resources.
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. This method // When this method exits the pod is expected to be ready for cleanup. This method
// reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios. // reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios.
// //
// Because the kubelet has no local store of information, all actions in this method that modify // Because the kubelet has no local store of information, all actions in this method that modify
@ -2071,7 +2078,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
// generate the final status of the pod // generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update // TODO: should we simply fold this into TerminatePod? that would give a single pod update
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus) kl.statusManager.SetPodStatus(pod, apiPodStatus)
@ -2082,6 +2089,21 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
} }
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
if !kl.keepTerminatedPodVolumes {
// This waiting loop relies on the background cleanup which starts after pod workers respond
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
}
// After volume unmount is complete, let the secret and configmap managers know we're done with this pod // After volume unmount is complete, let the secret and configmap managers know we're done with this pod
if kl.secretManager != nil { if kl.secretManager != nil {
kl.secretManager.UnregisterPod(pod) kl.secretManager.UnregisterPod(pod)

View File

@ -901,26 +901,6 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
return pullSecrets return pullSecrets
} }
func countRunningContainerStatus(status v1.PodStatus) int {
var runningContainers int
for _, c := range status.InitContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
for _, c := range status.ContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
for _, c := range status.EphemeralContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
return runningContainers
}
// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running // PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running
// containers. This returns false if the pod has not yet been started or the pod is unknown. // containers. This returns false if the pod has not yet been started or the pod is unknown.
func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool { func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
@ -941,48 +921,11 @@ func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return false return false
} }
// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have // PodIsFinished returns true if SyncTerminatedPod is finished, ie.
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. // all required node-level resources that a pod was consuming have
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { // been reclaimed by the kubelet.
if kl.podWorkers.CouldHaveRunningContainers(pod.UID) { func (kl *Kubelet) PodIsFinished(pod *v1.Pod) bool {
// We shouldn't delete pods that still have running containers return kl.podWorkers.ShouldPodBeFinished(pod.UID)
klog.V(3).InfoS("Pod is terminated, but some containers are still running", "pod", klog.KObj(pod))
return false
}
if count := countRunningContainerStatus(status); count > 0 {
// We shouldn't delete pods until the reported pod status contains no more running containers (the previous
// check ensures no more status can be generated, this check verifies we have seen enough of the status)
klog.V(3).InfoS("Pod is terminated, but some container status has not yet been reported", "pod", klog.KObj(pod), "running", count)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod))
return false
}
if kl.kubeletConfiguration.CgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
if pcm.Exists(pod) {
klog.V(3).InfoS("Pod is terminated, but pod cgroup sandbox has not been cleaned up", "pod", klog.KObj(pod))
return false
}
}
// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
// container for retrieving logs and we want to make sure logs are available until the pod is
// physically deleted.
klog.V(3).InfoS("Pod is terminated and all resources are reclaimed", "pod", klog.KObj(pod))
return true
}
// podResourcesAreReclaimed simply calls PodResourcesAreReclaimed with the most up-to-date status.
func (kl *Kubelet) podResourcesAreReclaimed(pod *v1.Pod) bool {
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
status = pod.Status
}
return kl.PodResourcesAreReclaimed(pod, status)
} }
// filterOutInactivePods returns pods that are not in a terminal phase // filterOutInactivePods returns pods that are not in a terminal phase
@ -1440,7 +1383,8 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
} }
// getPhase returns the phase of a pod given its container info. // getPhase returns the phase of a pod given its container info.
func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase { func getPhase(pod *v1.Pod, info []v1.ContainerStatus, podIsTerminal bool) v1.PodPhase {
spec := pod.Spec
pendingInitialization := 0 pendingInitialization := 0
failedInitialization := 0 failedInitialization := 0
for _, container := range spec.InitContainers { for _, container := range spec.InitContainers {
@ -1517,6 +1461,19 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
// one container is running // one container is running
return v1.PodRunning return v1.PodRunning
case running == 0 && stopped > 0 && unknown == 0: case running == 0 && stopped > 0 && unknown == 0:
// The pod is terminal so its containers won't be restarted regardless
// of the restart policy.
if podIsTerminal {
// TODO(#116484): Also assign terminal phase to static pods.
if !kubetypes.IsStaticPod(pod) {
// All containers are terminated in success
if stopped == succeeded {
return v1.PodSucceeded
}
// There is at least one failure
return v1.PodFailed
}
}
// All containers are terminated // All containers are terminated
if spec.RestartPolicy == v1.RestartPolicyAlways { if spec.RestartPolicy == v1.RestartPolicyAlways {
// All containers are in the process of restarting // All containers are in the process of restarting
@ -1567,8 +1524,8 @@ func (kl *Kubelet) determinePodResizeStatus(pod *v1.Pod, podStatus *v1.PodStatus
// generateAPIPodStatus creates the final API pod status for a pod, given the // generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status. This method should only be called from within sync*Pod methods. // internal pod status. This method should only be called from within sync*Pod methods.
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podIsTerminal bool) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) klog.V(3).InfoS("Generating pod status", "podIsTerminal", podIsTerminal, "pod", klog.KObj(pod))
// use the previous pod status, or the api status, as the basis for this pod // use the previous pod status, or the api status, as the basis for this pod
oldPodStatus, found := kl.statusManager.GetPodStatus(pod.UID) oldPodStatus, found := kl.statusManager.GetPodStatus(pod.UID)
if !found { if !found {
@ -1580,7 +1537,7 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
} }
// calculate the next phase and preserve reason // calculate the next phase and preserve reason
allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...) allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
s.Phase = getPhase(&pod.Spec, allStatus) s.Phase = getPhase(pod, allStatus, podIsTerminal)
klog.V(4).InfoS("Got phase for pod", "pod", klog.KObj(pod), "oldPhase", oldPodStatus.Phase, "phase", s.Phase) klog.V(4).InfoS("Got phase for pod", "pod", klog.KObj(pod), "oldPhase", oldPodStatus.Phase, "phase", s.Phase)
// Perform a three-way merge between the statuses from the status manager, // Perform a three-way merge between the statuses from the status manager,

View File

@ -2152,11 +2152,17 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
} }
tests := []struct { tests := []struct {
pod *v1.Pod pod *v1.Pod
status v1.PodPhase podIsTerminal bool
test string status v1.PodPhase
test string
}{ }{
{&v1.Pod{Spec: desiredState, Status: v1.PodStatus{}}, v1.PodPending, "waiting"}, {
&v1.Pod{Spec: desiredState, Status: v1.PodStatus{}},
false,
v1.PodPending,
"waiting",
},
{ {
&v1.Pod{ &v1.Pod{
Spec: desiredState, Spec: desiredState,
@ -2167,6 +2173,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodRunning, v1.PodRunning,
"all running", "all running",
}, },
@ -2180,9 +2187,38 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodRunning, v1.PodRunning,
"all stopped with restart always", "all stopped with restart always",
}, },
{
&v1.Pod{
Spec: desiredState,
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
succeededState("containerA"),
succeededState("containerB"),
},
},
},
true,
v1.PodSucceeded,
"all succeeded with restart always, but the pod is terminal",
},
{
&v1.Pod{
Spec: desiredState,
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
succeededState("containerA"),
failedState("containerB"),
},
},
},
true,
v1.PodFailed,
"all stopped with restart always, but the pod is terminal",
},
{ {
&v1.Pod{ &v1.Pod{
Spec: desiredState, Spec: desiredState,
@ -2193,6 +2229,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodRunning, v1.PodRunning,
"mixed state #1 with restart always", "mixed state #1 with restart always",
}, },
@ -2205,6 +2242,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodPending, v1.PodPending,
"mixed state #2 with restart always", "mixed state #2 with restart always",
}, },
@ -2218,6 +2256,7 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodPending, v1.PodPending,
"mixed state #3 with restart always", "mixed state #3 with restart always",
}, },
@ -2231,12 +2270,13 @@ func TestPodPhaseWithRestartAlways(t *testing.T) {
}, },
}, },
}, },
false,
v1.PodRunning, v1.PodRunning,
"backoff crashloop container with restart always", "backoff crashloop container with restart always",
}, },
} }
for _, test := range tests { for _, test := range tests {
status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) status := getPhase(test.pod, test.pod.Status.ContainerStatuses, test.podIsTerminal)
assert.Equal(t, test.status, status, "[test %s]", test.test) assert.Equal(t, test.status, status, "[test %s]", test.test)
} }
} }
@ -2339,7 +2379,7 @@ func TestPodPhaseWithRestartAlwaysInitContainers(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...) statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...)
status := getPhase(&test.pod.Spec, statusInfo) status := getPhase(test.pod, statusInfo, false)
assert.Equal(t, test.status, status, "[test %s]", test.test) assert.Equal(t, test.status, status, "[test %s]", test.test)
} }
} }
@ -2439,7 +2479,7 @@ func TestPodPhaseWithRestartNever(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) status := getPhase(test.pod, test.pod.Status.ContainerStatuses, false)
assert.Equal(t, test.status, status, "[test %s]", test.test) assert.Equal(t, test.status, status, "[test %s]", test.test)
} }
} }
@ -2542,7 +2582,7 @@ func TestPodPhaseWithRestartNeverInitContainers(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...) statusInfo := append(test.pod.Status.InitContainerStatuses[:], test.pod.Status.ContainerStatuses[:]...)
status := getPhase(&test.pod.Spec, statusInfo) status := getPhase(test.pod, statusInfo, false)
assert.Equal(t, test.status, status, "[test %s]", test.test) assert.Equal(t, test.status, status, "[test %s]", test.test)
} }
} }
@ -2655,7 +2695,7 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
status := getPhase(&test.pod.Spec, test.pod.Status.ContainerStatuses) status := getPhase(test.pod, test.pod.Status.ContainerStatuses, false)
assert.Equal(t, test.status, status, "[test %s]", test.test) assert.Equal(t, test.status, status, "[test %s]", test.test)
} }
} }
@ -2785,13 +2825,14 @@ func Test_generateAPIPodStatus(t *testing.T) {
currentStatus *kubecontainer.PodStatus currentStatus *kubecontainer.PodStatus
unreadyContainer []string unreadyContainer []string
previousStatus v1.PodStatus previousStatus v1.PodStatus
isPodTerminal bool
enablePodDisruptionConditions bool enablePodDisruptionConditions bool
expected v1.PodStatus expected v1.PodStatus
expectedPodDisruptionCondition v1.PodCondition expectedPodDisruptionCondition v1.PodCondition
expectedPodHasNetworkCondition v1.PodCondition expectedPodHasNetworkCondition v1.PodCondition
}{ }{
{ {
name: "pod disruption condition is copied over; PodDisruptionConditions enabled", name: "pod disruption condition is copied over and the phase is set to failed when deleted; PodDisruptionConditions enabled",
pod: &v1.Pod{ pod: &v1.Pod{
Spec: desiredState, Spec: desiredState,
Status: v1.PodStatus{ Status: v1.PodStatus{
@ -2819,15 +2860,16 @@ func Test_generateAPIPodStatus(t *testing.T) {
LastTransitionTime: normalized_now, LastTransitionTime: normalized_now,
}}, }},
}, },
isPodTerminal: true,
enablePodDisruptionConditions: true, enablePodDisruptionConditions: true,
expected: v1.PodStatus{ expected: v1.PodStatus{
Phase: v1.PodRunning, Phase: v1.PodFailed,
HostIP: "127.0.0.1", HostIP: "127.0.0.1",
QOSClass: v1.PodQOSBestEffort, QOSClass: v1.PodQOSBestEffort,
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
{Type: v1.PodInitialized, Status: v1.ConditionTrue}, {Type: v1.PodInitialized, Status: v1.ConditionTrue},
{Type: v1.PodReady, Status: v1.ConditionTrue}, {Type: v1.PodReady, Status: v1.ConditionFalse, Reason: "PodFailed"},
{Type: v1.ContainersReady, Status: v1.ConditionTrue}, {Type: v1.ContainersReady, Status: v1.ConditionFalse, Reason: "PodFailed"},
{Type: v1.PodScheduled, Status: v1.ConditionTrue}, {Type: v1.PodScheduled, Status: v1.ConditionTrue},
}, },
ContainerStatuses: []v1.ContainerStatus{ ContainerStatuses: []v1.ContainerStatus{
@ -3223,7 +3265,7 @@ func Test_generateAPIPodStatus(t *testing.T) {
kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod)
} }
expected := test.expected.DeepCopy() expected := test.expected.DeepCopy()
actual := kl.generateAPIPodStatus(test.pod, test.currentStatus) actual := kl.generateAPIPodStatus(test.pod, test.currentStatus, test.isPodTerminal)
if enablePodHasNetworkCondition { if enablePodHasNetworkCondition {
expected.Conditions = append([]v1.PodCondition{test.expectedPodHasNetworkCondition}, expected.Conditions...) expected.Conditions = append([]v1.PodCondition{test.expectedPodHasNetworkCondition}, expected.Conditions...)
} }
@ -3761,7 +3803,7 @@ func TestGenerateAPIPodStatusHostNetworkPodIPs(t *testing.T) {
IPs: tc.criPodIPs, IPs: tc.criPodIPs,
} }
status := kl.generateAPIPodStatus(pod, criStatus) status := kl.generateAPIPodStatus(pod, criStatus, false)
if !reflect.DeepEqual(status.PodIPs, tc.podIPs) { if !reflect.DeepEqual(status.PodIPs, tc.podIPs) {
t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs) t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs)
} }
@ -3874,7 +3916,7 @@ func TestNodeAddressUpdatesGenerateAPIPodStatusHostNetworkPodIPs(t *testing.T) {
} }
podStatus.IPs = tc.nodeIPs podStatus.IPs = tc.nodeIPs
status := kl.generateAPIPodStatus(pod, podStatus) status := kl.generateAPIPodStatus(pod, podStatus, false)
if !reflect.DeepEqual(status.PodIPs, tc.expectedPodIPs) { if !reflect.DeepEqual(status.PodIPs, tc.expectedPodIPs) {
t.Fatalf("Expected PodIPs %#v, got %#v", tc.expectedPodIPs, status.PodIPs) t.Fatalf("Expected PodIPs %#v, got %#v", tc.expectedPodIPs, status.PodIPs)
} }
@ -4007,7 +4049,7 @@ func TestGenerateAPIPodStatusPodIPs(t *testing.T) {
IPs: tc.criPodIPs, IPs: tc.criPodIPs,
} }
status := kl.generateAPIPodStatus(pod, criStatus) status := kl.generateAPIPodStatus(pod, criStatus, false)
if !reflect.DeepEqual(status.PodIPs, tc.podIPs) { if !reflect.DeepEqual(status.PodIPs, tc.podIPs) {
t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs) t.Fatalf("Expected PodIPs %#v, got %#v", tc.podIPs, status.PodIPs)
} }

View File

@ -1989,7 +1989,7 @@ func TestGenerateAPIPodStatusWithSortedContainers(t *testing.T) {
ContainerStatuses: cStatuses, ContainerStatuses: cStatuses,
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
apiStatus := kubelet.generateAPIPodStatus(pod, status) apiStatus := kubelet.generateAPIPodStatus(pod, status, false)
for i, c := range apiStatus.ContainerStatuses { for i, c := range apiStatus.ContainerStatuses {
if expectedOrder[i] != c.Name { if expectedOrder[i] != c.Name {
t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name) t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name)
@ -2203,7 +2203,7 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
pod.Spec.Containers = test.containers pod.Spec.Containers = test.containers
pod.Status.ContainerStatuses = test.oldStatuses pod.Status.ContainerStatuses = test.oldStatuses
podStatus.ContainerStatuses = test.statuses podStatus.ContainerStatuses = test.statuses
apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
verifyContainerStatuses(t, apiStatus.ContainerStatuses, test.expectedState, test.expectedLastTerminationState, fmt.Sprintf("case %d", i)) verifyContainerStatuses(t, apiStatus.ContainerStatuses, test.expectedState, test.expectedLastTerminationState, fmt.Sprintf("case %d", i))
} }
@ -2216,7 +2216,7 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
pod.Spec.InitContainers = test.containers pod.Spec.InitContainers = test.containers
pod.Status.InitContainerStatuses = test.oldStatuses pod.Status.InitContainerStatuses = test.oldStatuses
podStatus.ContainerStatuses = test.statuses podStatus.ContainerStatuses = test.statuses
apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
expectedState := test.expectedState expectedState := test.expectedState
if test.expectedInitState != nil { if test.expectedInitState != nil {
expectedState = test.expectedInitState expectedState = test.expectedInitState
@ -2355,14 +2355,14 @@ func TestGenerateAPIPodStatusWithDifferentRestartPolicies(t *testing.T) {
pod.Spec.RestartPolicy = test.restartPolicy pod.Spec.RestartPolicy = test.restartPolicy
// Test normal containers // Test normal containers
pod.Spec.Containers = containers pod.Spec.Containers = containers
apiStatus := kubelet.generateAPIPodStatus(pod, podStatus) apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
expectedState, expectedLastTerminationState := test.expectedState, test.expectedLastTerminationState expectedState, expectedLastTerminationState := test.expectedState, test.expectedLastTerminationState
verifyContainerStatuses(t, apiStatus.ContainerStatuses, expectedState, expectedLastTerminationState, fmt.Sprintf("case %d", c)) verifyContainerStatuses(t, apiStatus.ContainerStatuses, expectedState, expectedLastTerminationState, fmt.Sprintf("case %d", c))
pod.Spec.Containers = nil pod.Spec.Containers = nil
// Test init containers // Test init containers
pod.Spec.InitContainers = containers pod.Spec.InitContainers = containers
apiStatus = kubelet.generateAPIPodStatus(pod, podStatus) apiStatus = kubelet.generateAPIPodStatus(pod, podStatus, false)
if test.expectedInitState != nil { if test.expectedInitState != nil {
expectedState = test.expectedInitState expectedState = test.expectedInitState
} }
@ -2656,7 +2656,7 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
Name: pod.Name, Name: pod.Name,
Namespace: pod.Namespace, Namespace: pod.Namespace,
} }
apiStatus := kubelet.generateAPIPodStatus(pod, status) apiStatus := kubelet.generateAPIPodStatus(pod, status, false)
require.Equal(t, v1.PodFailed, apiStatus.Phase) require.Equal(t, v1.PodFailed, apiStatus.Phase)
require.Equal(t, "Evicted", apiStatus.Reason) require.Equal(t, "Evicted", apiStatus.Reason)
require.Equal(t, "because", apiStatus.Message) require.Equal(t, "because", apiStatus.Message)

View File

@ -159,7 +159,8 @@ type PodWorkers interface {
// returned as knownPods. // returned as knownPods.
SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync)
// IsPodKnownTerminated returns true if the provided pod UID is known by the pod // IsPodKnownTerminated returns true once SyncTerminatingPod completes
// successfully - the provided pod UID it is known by the pod
// worker to be terminated. If the pod has been force deleted and the pod worker // worker to be terminated. If the pod has been force deleted and the pod worker
// has completed termination this method will return false, so this method should // has completed termination this method will return false, so this method should
// only be used to filter out pods from the desired set such as in admission. // only be used to filter out pods from the desired set such as in admission.
@ -174,6 +175,19 @@ type PodWorkers interface {
// Intended for use by the kubelet config loops, but not subsystems, which should // Intended for use by the kubelet config loops, but not subsystems, which should
// use ShouldPod*(). // use ShouldPod*().
CouldHaveRunningContainers(uid types.UID) bool CouldHaveRunningContainers(uid types.UID) bool
// ShouldPodBeFinished returns true once SyncTerminatedPod completes
// successfully - the provided pod UID it is known to the pod worker to
// be terminated and have resources reclaimed. It returns false before the
// pod workers have synced (syncPod could be called). Once the pod workers
// have synced it returns false if the pod has a sync status until
// SyncTerminatedPod completes successfully. If the pod workers have synced,
// but the pod does not have a status it returns true.
//
// Intended for use by subsystem sync loops to avoid performing background setup
// after termination has been requested for a pod. Callers must ensure that the
// syncPod method is non-blocking when their data is absent.
ShouldPodBeFinished(uid types.UID) bool
// IsPodTerminationRequested returns true when pod termination has been requested // IsPodTerminationRequested returns true when pod termination has been requested
// until the termination completes and the pod is removed from config. This should // until the termination completes and the pod is removed from config. This should
// not be used in cleanup loops because it will return false if the pod has already // not be used in cleanup loops because it will return false if the pod has already
@ -521,7 +535,7 @@ func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) {
// . ^ other loops can tear down // . ^ other loops can tear down
// . . // . .
// //
// ------------------------------------| |---- = status manager is waiting for PodResourcesAreReclaimed() // ------------------------------------| |---- = status manager is waiting for SyncTerminatedPod() finished
// //
// . ^ . // . ^ .
// //
@ -625,6 +639,17 @@ func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool {
return !p.podsSynced return !p.podsSynced
} }
func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool {
p.podLock.Lock()
defer p.podLock.Unlock()
if status, ok := p.podSyncStatuses[uid]; ok {
return status.IsFinished()
}
// once all pods are synced, any pod without sync status is assumed to
// have SyncTerminatedPod finished.
return p.podsSynced
}
func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool { func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool {
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()

View File

@ -56,6 +56,7 @@ type fakePodWorkers struct {
terminating map[types.UID]bool terminating map[types.UID]bool
terminated map[types.UID]bool terminated map[types.UID]bool
terminationRequested map[types.UID]bool terminationRequested map[types.UID]bool
finished map[types.UID]bool
removeRuntime map[types.UID]bool removeRuntime map[types.UID]bool
removeContent map[types.UID]bool removeContent map[types.UID]bool
terminatingStaticPods map[string]bool terminatingStaticPods map[string]bool
@ -105,6 +106,11 @@ func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool {
defer f.statusLock.Unlock() defer f.statusLock.Unlock()
return f.running[uid] return f.running[uid]
} }
func (f *fakePodWorkers) ShouldPodBeFinished(uid types.UID) bool {
f.statusLock.Lock()
defer f.statusLock.Unlock()
return f.finished[uid]
}
func (f *fakePodWorkers) IsPodTerminationRequested(uid types.UID) bool { func (f *fakePodWorkers) IsPodTerminationRequested(uid types.UID) bool {
f.statusLock.Lock() f.statusLock.Lock()
defer f.statusLock.Unlock() defer f.statusLock.Unlock()

View File

@ -60,12 +60,10 @@ type versionedPodStatus struct {
// at is the time at which the most recent status update was detected // at is the time at which the most recent status update was detected
at time.Time at time.Time
status v1.PodStatus // True if the status is generated at the end of SyncTerminatedPod, or after it is completed.
} podIsFinished bool
type podStatusSyncRequest struct { status v1.PodStatus
podUID types.UID
status versionedPodStatus
} }
// Updates pod statuses in apiserver. Writes only when new status has changed. // Updates pod statuses in apiserver. Writes only when new status has changed.
@ -76,7 +74,7 @@ type manager struct {
// Map from pod UID to sync status of the corresponding pod. // Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest podStatusChannel chan struct{}
// Map from (mirror) pod UID to latest status version successfully sent to the API server. // Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread. // apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[kubetypes.MirrorPodUID]uint64 apiStatusVersions map[kubetypes.MirrorPodUID]uint64
@ -99,8 +97,6 @@ type PodStatusProvider interface {
// PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted.
type PodDeletionSafetyProvider interface { type PodDeletionSafetyProvider interface {
// PodResourcesAreReclaimed returns true if the pod can safely be deleted.
PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
// PodCouldHaveRunningContainers returns true if the pod could have running containers. // PodCouldHaveRunningContainers returns true if the pod could have running containers.
PodCouldHaveRunningContainers(pod *v1.Pod) bool PodCouldHaveRunningContainers(pod *v1.Pod) bool
} }
@ -158,7 +154,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
kubeClient: kubeClient, kubeClient: kubeClient,
podManager: podManager, podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus), podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses podStatusChannel: make(chan struct{}, 1),
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety, podDeletionSafety: podDeletionSafety,
podStartupLatencyHelper: podStartupLatencyHelper, podStartupLatencyHelper: podStartupLatencyHelper,
@ -217,19 +213,12 @@ func (m *manager) Start() {
go wait.Forever(func() { go wait.Forever(func() {
for { for {
select { select {
case syncRequest := <-m.podStatusChannel: case <-m.podStatusChannel:
klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel", klog.V(4).InfoS("Syncing updated statuses")
"podUID", syncRequest.podUID, m.syncBatch(false)
"statusVersion", syncRequest.status.version,
"status", syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker: case <-syncTicker:
klog.V(5).InfoS("Status Manager: syncing batch") klog.V(4).InfoS("Syncing all statuses")
// remove any entries in the status channel since the batch will handle them m.syncBatch(true)
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
} }
} }
}, 0) }, 0)
@ -291,7 +280,7 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
// Force a status update if deletion timestamp is set. This is necessary // Force a status update if deletion timestamp is set. This is necessary
// because if the pod is in the non-running state, the pod worker still // because if the pod is in the non-running state, the pod worker still
// needs to be able to trigger an update and/or deletion. // needs to be able to trigger an update and/or deletion.
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false)
} }
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
@ -352,7 +341,7 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
} }
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase)) updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
m.updateStatusInternal(pod, status, false) m.updateStatusInternal(pod, status, false, false)
} }
func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) { func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
@ -394,7 +383,7 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine
containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Started = &started containerStatus.Started = &started
m.updateStatusInternal(pod, status, false) m.updateStatusInternal(pod, status, false, false)
} }
func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
@ -423,6 +412,8 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta
// the pod as successful. If we have not yet initialized the pod in the presence of init containers, // the pod as successful. If we have not yet initialized the pod in the presence of init containers,
// the init container failure status is sufficient to describe the pod as failing, and we do not need // the init container failure status is sufficient to describe the pod as failing, and we do not need
// to override waiting containers (unless there is evidence the pod previously started those containers). // to override waiting containers (unless there is evidence the pod previously started those containers).
// It also makes sure that pods are transitioned to a terminal phase (Failed or Succeeded) before
// their deletion.
func (m *manager) TerminatePod(pod *v1.Pod) { func (m *manager) TerminatePod(pod *v1.Pod) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
@ -430,7 +421,8 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
// ensure that all containers have a terminated state - because we do not know whether the container // ensure that all containers have a terminated state - because we do not know whether the container
// was successful, always report an error // was successful, always report an error
oldStatus := &pod.Status oldStatus := &pod.Status
if cachedStatus, ok := m.podStatuses[pod.UID]; ok { cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
oldStatus = &cachedStatus.status oldStatus = &cachedStatus.status
} }
status := *oldStatus.DeepCopy() status := *oldStatus.DeepCopy()
@ -466,8 +458,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
} }
} }
// Make sure all pods are transitioned to a terminal phase.
// TODO(#116484): Also assign terminal phase to static an pods.
if !kubetypes.IsStaticPod(pod) {
switch status.Phase {
case v1.PodSucceeded, v1.PodFailed:
// do nothing, already terminal
case v1.PodPending, v1.PodRunning:
if status.Phase == v1.PodRunning && isCached {
klog.InfoS("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
klog.V(3).InfoS("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
status.Phase = v1.PodFailed
default:
klog.ErrorS(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
status.Phase = v1.PodFailed
}
}
klog.V(5).InfoS("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(5).InfoS("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID)
m.updateStatusInternal(pod, status, true) m.updateStatusInternal(pod, status, true, true)
} }
// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which // hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which
@ -540,13 +550,20 @@ func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus
} }
// updateStatusInternal updates the internal status cache, and queues an update to the api server if // updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered. // necessary.
// This method IS NOT THREAD SAFE and must be called from a locked function. // This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool { func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
var oldStatus v1.PodStatus var oldStatus v1.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID] cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached { if isCached {
oldStatus = cachedStatus.status oldStatus = cachedStatus.status
// TODO(#116484): Also assign terminal phase to static pods.
if !kubetypes.IsStaticPod(pod) {
if cachedStatus.podIsFinished && !podIsFinished {
klog.InfoS("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error.", "pod", klog.KObj(pod))
podIsFinished = true
}
}
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status oldStatus = mirrorPod.Status
} else { } else {
@ -556,11 +573,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Check for illegal state transition in containers // Check for illegal state transition in containers
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return false return
} }
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return false return
} }
// Set ContainersReadyCondition.LastTransitionTime. // Set ContainersReadyCondition.LastTransitionTime.
@ -623,21 +640,22 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous)) containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous))
} }
sort.Strings(containers) sort.Strings(containers)
klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
} }
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
return false // No new status. return
} }
newStatus := versionedPodStatus{ newStatus := versionedPodStatus{
status: status, status: status,
version: cachedStatus.version + 1, version: cachedStatus.version + 1,
podName: pod.Name, podName: pod.Name,
podNamespace: pod.Namespace, podNamespace: pod.Namespace,
podIsFinished: podIsFinished,
} }
// Multiple status updates can be generated before we update the API server, // Multiple status updates can be generated before we update the API server,
@ -652,20 +670,9 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
m.podStatuses[pod.UID] = newStatus m.podStatuses[pod.UID] = newStatus
select { select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: case m.podStatusChannel <- struct{}{}:
klog.V(5).InfoS("Status Manager: adding pod with new status to podStatusChannel",
"pod", klog.KObj(pod),
"podUID", pod.UID,
"statusVersion", newStatus.version,
"status", newStatus.status)
return true
default: default:
// Let the periodic syncBatch handle the update if the channel is full. // there's already a status update pending
// We can't block, since we hold the mutex lock.
klog.V(4).InfoS("Skipping the status update for pod for now because the channel is full",
"pod", klog.KObj(pod),
"status", status)
return false
} }
} }
@ -710,25 +717,38 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
} }
} }
// syncBatch syncs pods statuses with the apiserver. // syncBatch syncs pods statuses with the apiserver. Returns the number of syncs
func (m *manager) syncBatch() { // attempted for testing.
var updatedStatuses []podStatusSyncRequest func (m *manager) syncBatch(all bool) int {
type podSync struct {
podUID types.UID
statusUID kubetypes.MirrorPodUID
status versionedPodStatus
}
var updatedStatuses []podSync
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section func() { // Critical section
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
// Clean up orphaned versions. // Clean up orphaned versions.
for uid := range m.apiStatusVersions { if all {
_, hasPod := m.podStatuses[types.UID(uid)] for uid := range m.apiStatusVersions {
_, hasMirror := mirrorToPod[uid] _, hasPod := m.podStatuses[types.UID(uid)]
if !hasPod && !hasMirror { _, hasMirror := mirrorToPod[uid]
delete(m.apiStatusVersions, uid) if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
} }
} }
// Decide which pods need status updates.
for uid, status := range m.podStatuses { for uid, status := range m.podStatuses {
syncedUID := kubetypes.MirrorPodUID(uid) // translate the pod UID (source) to the status UID (API pod) -
// static pods are identified in source by pod UID but tracked in the
// API via the uid of the mirror pod
uidOfStatus := kubetypes.MirrorPodUID(uid)
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" { if mirrorUID == "" {
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping", klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
@ -736,34 +756,45 @@ func (m *manager) syncBatch() {
"pod", klog.KRef(status.podNamespace, status.podName)) "pod", klog.KRef(status.podNamespace, status.podName))
continue continue
} }
syncedUID = mirrorUID uidOfStatus = mirrorUID
} }
if m.needsUpdate(types.UID(syncedUID), status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) // if a new status update has been delivered, trigger an update, otherwise the
// pod can wait for the next bulk check (which performs reconciliation as well)
if !all {
if m.apiStatusVersions[uidOfStatus] >= status.version {
continue
}
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
continue
}
// Ensure that any new status, or mismatched status, or pod that is ready for
// deletion gets updated. If a status update fails we retry the next time any
// other pod is updated.
if m.needsUpdate(types.UID(uidOfStatus), status) {
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
} else if m.needsReconcile(uid, status.status) { } else if m.needsReconcile(uid, status.status) {
// Delete the apiStatusVersions here to force an update on the pod status // Delete the apiStatusVersions here to force an update on the pod status
// In most cases the deleted apiStatusVersions here should be filled // In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update // soon after the following syncPod() [If the syncPod() sync an update
// successfully]. // successfully].
delete(m.apiStatusVersions, syncedUID) delete(m.apiStatusVersions, uidOfStatus)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
} }
} }
}() }()
for _, update := range updatedStatuses { for _, update := range updatedStatuses {
klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID) klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status) m.syncPod(update.podUID, update.status)
} }
return len(updatedStatuses)
} }
// syncPod syncs the given status with the API server. The caller must not hold the lock. // syncPod syncs the given status with the API server. The caller must not hold the status lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid)
return
}
// TODO: make me easier to express from client code // TODO: make me easier to express from client code
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
@ -815,14 +846,14 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if status.at.IsZero() { if status.at.IsZero() {
klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version)
} else { } else {
duration := time.Now().Sub(status.at).Truncate(time.Millisecond) duration := time.Since(status.at).Truncate(time.Millisecond)
metrics.PodStatusSyncDuration.Observe(duration.Seconds()) metrics.PodStatusSyncDuration.Observe(duration.Seconds())
} }
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// We don't handle graceful deletion of mirror pods. // We don't handle graceful deletion of mirror pods.
if m.canBeDeleted(pod, status.status) { if m.canBeDeleted(pod, status.status, status.podIsFinished) {
deleteOptions := metav1.DeleteOptions{ deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64), GracePeriodSeconds: new(int64),
// Use the pod UID as the precondition for deletion to prevent deleting a // Use the pod UID as the precondition for deletion to prevent deleting a
@ -850,14 +881,24 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
if !ok { if !ok {
return false return false
} }
return m.canBeDeleted(pod, status.status) return m.canBeDeleted(pod, status.status, status.podIsFinished)
} }
func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool { func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool {
if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) { if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
return false return false
} }
return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status) // Delay deletion of pods until the phase is terminal.
if !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
klog.V(3).InfoS("Delaying pod deletion as the phase is non-terminal", "phase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
return false
}
// If this is an update completing pod termination then we know the pod termination is finished.
if podIsFinished {
klog.V(3).InfoS("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
return true
}
return false
} }
// needsReconcile compares the given status with the status in the pod manager (which // needsReconcile compares the given status with the status in the pod manager (which

View File

@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -69,7 +70,7 @@ func getTestPod() *v1.Pod {
// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation // After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation
// will be triggered, which will mess up all the old unit test. // will be triggered, which will mess up all the old unit test.
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in // To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
// pod manager the same with cached ones before syncBatch() so as to avoid reconciling. // pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling.
func (m *manager) testSyncBatch() { func (m *manager) testSyncBatch() {
for uid, status := range m.podStatuses { for uid, status := range m.podStatuses {
pod, ok := m.podManager.GetPodByUID(uid) pod, ok := m.podManager.GetPodByUID(uid)
@ -81,7 +82,7 @@ func (m *manager) testSyncBatch() {
pod.Status = status.status pod.Status = status.status
} }
} }
m.syncBatch() m.syncBatch(true)
} }
func newTestManager(kubeClient clientset.Interface) *manager { func newTestManager(kubeClient clientset.Interface) *manager {
@ -113,19 +114,19 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action
actions := manager.kubeClient.(*fake.Clientset).Actions() actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions() defer manager.kubeClient.(*fake.Clientset).ClearActions()
if len(actions) != len(expectedActions) { if len(actions) != len(expectedActions) {
t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) t.Fatalf("unexpected actions: %s", cmp.Diff(expectedActions, actions))
return
} }
for i := 0; i < len(actions); i++ { for i := 0; i < len(actions); i++ {
e := expectedActions[i] e := expectedActions[i]
a := actions[i] a := actions[i]
if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() { if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() {
t.Errorf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) t.Errorf("unexpected actions: %s", cmp.Diff(expectedActions, actions))
} }
} }
} }
func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
t.Helper()
// Consume all updates in the channel. // Consume all updates in the channel.
numUpdates := manager.consumeUpdates() numUpdates := manager.consumeUpdates()
if numUpdates != expectedUpdates { if numUpdates != expectedUpdates {
@ -137,9 +138,8 @@ func (m *manager) consumeUpdates() int {
updates := 0 updates := 0
for { for {
select { select {
case syncRequest := <-m.podStatusChannel: case <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status) updates += m.syncBatch(false)
updates++
default: default:
return updates return updates
} }
@ -214,8 +214,9 @@ func TestChangedStatus(t *testing.T) {
syncer := newTestManager(&fake.Clientset{}) syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod() testPod := getTestPod()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2) verifyUpdates(t, syncer, 1)
} }
func TestChangedStatusKeepsStartTime(t *testing.T) { func TestChangedStatusKeepsStartTime(t *testing.T) {
@ -225,8 +226,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
firstStatus := getRandomPodStatus() firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now firstStatus.StartTime = &now
syncer.SetPodStatus(testPod, firstStatus) syncer.SetPodStatus(testPod, firstStatus)
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2) verifyUpdates(t, syncer, 1)
finalStatus := expectPodStatus(t, syncer, testPod) finalStatus := expectPodStatus(t, syncer, testPod)
if finalStatus.StartTime.IsZero() { if finalStatus.StartTime.IsZero() {
t.Errorf("StartTime should not be zero") t.Errorf("StartTime should not be zero")
@ -407,9 +409,9 @@ func TestStaleUpdates(t *testing.T) {
status.Message = "second version bump" status.Message = "second version bump"
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch")
m.syncBatch() m.syncBatch(true)
verifyUpdates(t, m, 3) verifyUpdates(t, m, 0)
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing left in the channel to sync") t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{}) verifyActions(t, m, []core.Action{})
@ -423,7 +425,7 @@ func TestStaleUpdates(t *testing.T) {
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
m.syncBatch() m.syncBatch(true)
verifyActions(t, m, []core.Action{getAction()}) verifyActions(t, m, []core.Action{getAction()})
t.Logf("Nothing stuck in the pipe.") t.Logf("Nothing stuck in the pipe.")
@ -545,7 +547,7 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
m.syncBatch() m.syncBatch(true)
assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions())
t.Logf("Create the mirror pod") t.Logf("Create the mirror pod")
@ -558,6 +560,7 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should sync pod because the corresponding mirror pod is created") t.Logf("Should sync pod because the corresponding mirror pod is created")
assert.Equal(t, m.syncBatch(true), 1)
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("syncBatch should not sync any pods because nothing is changed.") t.Logf("syncBatch should not sync any pods because nothing is changed.")
@ -571,7 +574,7 @@ func TestStaticPod(t *testing.T) {
m.podManager.AddPod(mirrorPod) m.podManager.AddPod(mirrorPod)
t.Logf("Should not update to mirror pod, because UID has changed.") t.Logf("Should not update to mirror pod, because UID has changed.")
m.syncBatch() assert.Equal(t, m.syncBatch(true), 1)
verifyActions(t, m, []core.Action{getAction()}) verifyActions(t, m, []core.Action{getAction()})
} }
@ -745,13 +748,25 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
expectFn func(t *testing.T, status v1.PodStatus) expectFn func(t *testing.T, status v1.PodStatus)
}{ }{
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })}, {pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })},
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })}, {
{pod: newPod(0, 1, func(pod *v1.Pod) { pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodRunning pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{ }),
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, expectFn: func(t *testing.T, status v1.PodStatus) {
} status.Phase = v1.PodFailed
})}, },
},
{
pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
status.Phase = v1.PodFailed
},
},
{ {
name: "last termination state set", name: "last termination state set",
pod: newPod(0, 1, func(pod *v1.Pod) { pod: newPod(0, 1, func(pod *v1.Pod) {
@ -997,6 +1012,78 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
} }
} }
func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
testCases := map[string]struct {
enablePodDisruptionConditions bool
status v1.PodStatus
wantStatus v1.PodStatus
}{
"Pending pod": {
status: v1.PodStatus{
Phase: v1.PodPending,
},
wantStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
},
"Running pod": {
status: v1.PodStatus{
Phase: v1.PodRunning,
},
wantStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
},
"Succeeded pod": {
status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
wantStatus: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
"Failed pod": {
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
},
"Unknown pod": {
status: v1.PodStatus{
Phase: v1.PodUnknown,
},
wantStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
},
"Unknown phase pod": {
status: v1.PodStatus{
Phase: v1.PodPhase("SomeUnknownPhase"),
},
wantStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
pod := getTestPod()
pod.Status = tc.status
syncer.TerminatePod(pod)
gotStatus := expectPodStatus(t, syncer, pod.DeepCopy())
if diff := cmp.Diff(tc.wantStatus, gotStatus, cmpopts.IgnoreFields(v1.PodStatus{}, "StartTime")); diff != "" {
t.Fatalf("unexpected status: %s", diff)
}
})
}
}
func TestSetContainerReadiness(t *testing.T) { func TestSetContainerReadiness(t *testing.T) {
cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"}
cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"}
@ -1184,7 +1271,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Logf("Orphaned pods should be removed.") t.Logf("Orphaned pods should be removed.")
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.syncBatch() m.syncBatch(true)
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok {
t.Errorf("Should have cleared status for testPod") t.Errorf("Should have cleared status for testPod")
} }
@ -1216,7 +1303,7 @@ func TestReconcilePodStatus(t *testing.T) {
syncer := newTestManager(client) syncer := newTestManager(client)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
t.Logf("Call syncBatch directly to test reconcile") t.Logf("Call syncBatch directly to test reconcile")
syncer.syncBatch() // The apiStatusVersions should be set now syncer.syncBatch(true) // The apiStatusVersions should be set now
client.ClearActions() client.ClearActions()
podStatus, ok := syncer.GetPodStatus(testPod.UID) podStatus, ok := syncer.GetPodStatus(testPod.UID)
@ -1231,7 +1318,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status is the same, a reconciliation is not needed") t.Fatalf("Pod status is the same, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
@ -1246,7 +1333,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
@ -1256,7 +1343,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status is different, a reconciliation is needed") t.Fatalf("Pod status is different, a reconciliation is needed")
} }
syncer.SetPodStatus(testPod, changedPodStatus) syncer.SetPodStatus(testPod, changedPodStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }
@ -1268,36 +1355,32 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
return status return status
} }
func TestDeletePods(t *testing.T) { func TestDeletePodBeforeFinished(t *testing.T) {
pod := getTestPod() pod := getTestPod()
t.Logf("Set the deletion timestamp.") t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod) client := fake.NewSimpleClientset(pod)
m := newTestManager(client) m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true
m.podManager.AddPod(pod) m.podManager.AddPod(pod)
status := getRandomPodStatus() status := getRandomPodStatus()
now := metav1.Now() status.Phase = v1.PodFailed
status.StartTime = &now
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
t.Logf("Expect to see a delete action.") t.Logf("Expect not to see a delete action as the pod isn't finished yet (TerminatePod isn't called)")
verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
} }
func TestDeletePodWhileReclaiming(t *testing.T) { func TestDeletePodFinished(t *testing.T) {
pod := getTestPod() pod := getTestPod()
t.Logf("Set the deletion timestamp.") t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod) client := fake.NewSimpleClientset(pod)
m := newTestManager(client) m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false
m.podManager.AddPod(pod) m.podManager.AddPod(pod)
status := getRandomPodStatus() status := getRandomPodStatus()
now := metav1.Now() status.Phase = v1.PodFailed
status.StartTime = &now m.TerminatePod(pod)
m.SetPodStatus(pod, status) t.Logf("Expect to see a delete action as the pod is finished (TerminatePod called)")
t.Logf("Expect to see a delete action.") verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})
} }
func TestDoNotDeleteMirrorPods(t *testing.T) { func TestDoNotDeleteMirrorPods(t *testing.T) {

View File

@ -20,14 +20,9 @@ import v1 "k8s.io/api/core/v1"
// FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test.
type FakePodDeletionSafetyProvider struct { type FakePodDeletionSafetyProvider struct {
Reclaimed bool
HasRunning bool HasRunning bool
} }
func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
return f.Reclaimed
}
func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool { func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return f.HasRunning return f.HasRunning
} }

View File

@ -104,20 +104,6 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContaine
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod)
} }
// PodResourcesAreReclaimed mocks base method.
func (m *MockPodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PodResourcesAreReclaimed", pod, status)
ret0, _ := ret[0].(bool)
return ret0
}
// PodResourcesAreReclaimed indicates an expected call of PodResourcesAreReclaimed.
func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(pod, status interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status)
}
// MockPodStartupLatencyStateHelper is a mock of PodStartupLatencyStateHelper interface. // MockPodStartupLatencyStateHelper is a mock of PodStartupLatencyStateHelper interface.
type MockPodStartupLatencyStateHelper struct { type MockPodStartupLatencyStateHelper struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@ -401,7 +401,7 @@ func WaitForPodTerminatingInNamespaceTimeout(ctx context.Context, c clientset.In
// WaitForPodSuccessInNamespaceTimeout returns nil if the pod reached state success, or an error if it reached failure or ran too long. // WaitForPodSuccessInNamespaceTimeout returns nil if the pod reached state success, or an error if it reached failure or ran too long.
func WaitForPodSuccessInNamespaceTimeout(ctx context.Context, c clientset.Interface, podName, namespace string, timeout time.Duration) error { func WaitForPodSuccessInNamespaceTimeout(ctx context.Context, c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return WaitForPodCondition(ctx, c, namespace, podName, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout, func(pod *v1.Pod) (bool, error) { return WaitForPodCondition(ctx, c, namespace, podName, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout, func(pod *v1.Pod) (bool, error) {
if pod.Spec.RestartPolicy == v1.RestartPolicyAlways { if pod.DeletionTimestamp == nil && pod.Spec.RestartPolicy == v1.RestartPolicyAlways {
return true, fmt.Errorf("pod %q will never terminate with a succeeded state since its restart policy is Always", podName) return true, fmt.Errorf("pod %q will never terminate with a succeeded state since its restart policy is Always", podName)
} }
switch pod.Status.Phase { switch pod.Status.Phase {
@ -728,6 +728,13 @@ func WaitForPodContainerToFail(ctx context.Context, c clientset.Interface, names
}) })
} }
// WaitForPodScheduled waits for the pod to be schedule, ie. the .spec.nodeName is set
func WaitForPodScheduled(ctx context.Context, c clientset.Interface, namespace, podName string) error {
return WaitForPodCondition(ctx, c, namespace, podName, "pod is scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) {
return pod.Spec.NodeName != "", nil
})
}
// WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe. // WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe.
func WaitForPodContainerStarted(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error { func WaitForPodContainerStarted(ctx context.Context, c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error {
conditionDesc := fmt.Sprintf("container %d started", containerIndex) conditionDesc := fmt.Sprintf("container %d started", containerIndex)

View File

@ -0,0 +1,207 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
const (
testFinalizer = "example.com/test-finalizer"
)
var _ = SIGDescribe("Deleted pods handling [NodeConformance]", func() {
f := framework.NewDefaultFramework("deleted-pods-test")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
ginkgo.It("Should transition to Failed phase a pod which is deleted while pending", func(ctx context.Context) {
podName := "deleted-pending-" + string(uuid.NewUUID())
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Finalizers: []string{testFinalizer},
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{
{
Name: podName,
Image: "non-existing-repo/non-existing-image:v1.0",
ImagePullPolicy: "Always",
Command: []string{"bash"},
Args: []string{"-c", `echo "Hello world"`},
},
},
},
}
ginkgo.By("creating the pod with invalid image reference and finalizer")
pod := e2epod.NewPodClient(f).Create(ctx, podSpec)
ginkgo.By("set up cleanup of the finalizer")
ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer)
ginkgo.By("Waiting for the pod to be scheduled so that kubelet owns it")
err := e2epod.WaitForPodScheduled(ctx, f.ClientSet, pod.Namespace, pod.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be scheduled: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name))
err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned into the Failed phase", pod.Namespace, pod.Name))
err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, pod.Name, "", f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be terminated: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Fetch the end state of the pod (%v/%v)", pod.Namespace, pod.Name))
pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name)
})
ginkgo.DescribeTable("Should transition to Failed phase a deleted pod if non-zero exit codes",
func(ctx context.Context, policy v1.RestartPolicy) {
podName := "deleted-running-" + strings.ToLower(string(policy)) + "-" + string(uuid.NewUUID())
podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Finalizers: []string{testFinalizer},
},
Spec: v1.PodSpec{
RestartPolicy: policy,
Containers: []v1.Container{
{
Name: podName,
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"sleep", "1800"},
},
},
},
})
ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podSpec.Name, podSpec.Spec.RestartPolicy))
pod := e2epod.NewPodClient(f).Create(ctx, podSpec)
ginkgo.By("set up cleanup of the finalizer")
ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer)
ginkgo.By("Waiting for the pod to be running")
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be running: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name))
err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(1))
framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned to the failed phase", pod.Namespace, pod.Name))
err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, pod.Name, "", f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be terminated: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Fetching the end state of the pod (%v/%v)", pod.Namespace, pod.Name))
pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container is in the terminated state", pod.Namespace, pod.Name))
gomega.Expect(pod.Status.ContainerStatuses).Should(gomega.HaveLen(1))
containerStatus := pod.Status.ContainerStatuses[0]
gomega.Expect(containerStatus.State.Terminated).ToNot(gomega.BeNil(), "The pod container is in not in the Terminated state")
ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container exit code is 137", pod.Namespace, pod.Name))
gomega.Expect(containerStatus.State.Terminated.ExitCode).Should(gomega.Equal(int32(137)))
},
ginkgo.Entry("Restart policy Always", v1.RestartPolicyAlways),
ginkgo.Entry("Restart policy OnFailure", v1.RestartPolicyOnFailure),
ginkgo.Entry("Restart policy Never", v1.RestartPolicyNever),
)
ginkgo.DescribeTable("Should transition to Succeeded phase a deleted pod when containers complete with 0 exit code",
func(ctx context.Context, policy v1.RestartPolicy) {
podName := "deleted-running-" + strings.ToLower(string(policy)) + "-" + string(uuid.NewUUID())
podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Finalizers: []string{testFinalizer},
},
Spec: v1.PodSpec{
RestartPolicy: policy,
Containers: []v1.Container{
{
Name: podName,
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"sh", "-c"},
Args: []string{`
_term() {
echo "Caught SIGTERM signal!"
exit 0
}
trap _term SIGTERM
while true; do sleep 5; done
`,
},
},
},
},
})
ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podSpec.Name, podSpec.Spec.RestartPolicy))
pod := e2epod.NewPodClient(f).Create(ctx, podSpec)
ginkgo.By("set up cleanup of the finalizer")
ginkgo.DeferCleanup(e2epod.NewPodClient(f).RemoveFinalizer, pod.Name, testFinalizer)
ginkgo.By("Waiting for the pod to be running")
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be running: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name))
// wait a little bit to make sure the we are inside the while and that the trap is registered
time.Sleep(time.Second)
err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be transitioned to the succeeded phase", pod.Namespace, pod.Name))
err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be succeeded: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Fetching the end state of the pod (%v/%v)", pod.Namespace, pod.Name))
pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Failed to fetch the end state of the pod: %q", pod.Name)
ginkgo.By(fmt.Sprintf("Verify the pod (%v/%v) container is in the terminated state", pod.Namespace, pod.Name))
gomega.Expect(pod.Status.ContainerStatuses).Should(gomega.HaveLen(1))
containerStatus := pod.Status.ContainerStatuses[0]
gomega.Expect(containerStatus.State.Terminated).ShouldNot(gomega.BeNil(), "The pod container is in not in the Terminated state")
ginkgo.By(fmt.Sprintf("Verifying the exit code for the terminated container is 0 for pod (%v/%v)", pod.Namespace, pod.Name))
gomega.Expect(containerStatus.State.Terminated.ExitCode).Should(gomega.Equal(int32(0)))
},
ginkgo.Entry("Restart policy Always", v1.RestartPolicyAlways),
ginkgo.Entry("Restart policy OnFailure", v1.RestartPolicyOnFailure),
ginkgo.Entry("Restart policy Never", v1.RestartPolicyNever),
)
})