kubelet: Delay writing a terminal phase until the pod is terminated

Other components must know when the Kubelet has released critical
resources for terminal pods. Do not set the phase in the apiserver
to terminal until all containers are stopped and cannot restart.

As a consequence of this change, the Kubelet must explicitly transition
a terminal pod to the terminating state in the pod worker which is
handled by returning a new isTerminal boolean from syncPod.

Finally, if a pod with init containers hasn't been initialized yet,
don't default container statuses or not yet attempted init containers
to the unknown failure state.
This commit is contained in:
Clayton Coleman 2022-02-26 14:36:51 -05:00
parent 9cb9a29f34
commit 69a3820214
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
10 changed files with 734 additions and 89 deletions

View File

@ -1452,24 +1452,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
} }
// syncPod is the transaction script for the sync of a single pod (setting up) // syncPod is the transaction script for the sync of a single pod (setting up)
// a pod. The reverse (teardown) is handled in syncTerminatingPod and // a pod. This method is reentrant and expected to converge a pod towards the
// syncTerminatedPod. If syncPod exits without error, then the pod runtime // desired state of the spec. The reverse (teardown) is handled in
// state is in sync with the desired configuration state (pod is running). // syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
// If syncPod exits with a transient error, the next invocation of syncPod // then the pod runtime state is in sync with the desired configuration state
// is expected to make progress towards reaching the runtime state. // (pod is running). If syncPod exits with a transient error, the next
// invocation of syncPod is expected to make progress towards reaching the
// runtime state. syncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will by
// syncTerminatingPod.
// //
// Arguments: // Arguments:
// //
// o - the SyncPodOptions for this invocation // updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
// pod - the pod that is being set up
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of syncPod
// //
// The workflow is: // The workflow is:
// * Kill the pod immediately if update type is SyncPodKill
// * If the pod is being created, record pod worker start latency // * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod // * If the pod is being seen as running for the first time, record pod
// start latency // start latency
// * Update the status of the pod in the status manager // * Update the status of the pod in the status manager
// * Kill the pod if it should not be running due to soft admission // * Stop the pod's containers if it should not be running due to soft
// admission
// * Ensure any background tracking for a runnable pod is started
// * Create a mirror pod if the pod is a static pod, and does not // * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod // already have a mirror pod
// * Create the data directories for the pod if they do not exist // * Create the data directories for the pod if they do not exist
@ -1483,10 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// //
// This operation writes all events that are dispatched in order to provide // This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging. // the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error. // Callers should not write an event if this operation returns an error.
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) defer func() {
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
}()
// Latency measurements for the main workflow are relative to the // Latency measurements for the main workflow are relative to the
// first time the pod was seen by kubelet. // first time the pod was seen by kubelet.
@ -1518,11 +1532,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
for _, ipInfo := range apiPodStatus.PodIPs { for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP) podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
} }
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP} podStatus.IPs = []string{apiPodStatus.PodIP}
} }
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
// If the pod should not be running, we request the pod's containers be stopped. This is not the same // If the pod should not be running, we request the pod's containers be stopped. This is not the same
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows // as termination (we want to stop the pod, but potentially restart it later if soft admission allows
// it later). Set the status and phase appropriately // it later). Set the status and phase appropriately
@ -1572,13 +1592,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Return an error to signal that the sync loop should back off. // Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
} }
return syncErr return false, syncErr
} }
// If the network plugin is not ready, only start the pod if it uses the host network // If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
} }
// ensure the kubelet knows about referenced secrets or configmaps used by the pod // ensure the kubelet knows about referenced secrets or configmaps used by the pod
@ -1635,7 +1655,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
} }
if err := pcm.EnsureExists(pod); err != nil { if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
} }
} }
} }
@ -1676,7 +1696,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.makePodDataDirs(pod); err != nil { if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return err return false, err
} }
// Volume manager will not mount volumes for terminating pods // Volume manager will not mount volumes for terminating pods
@ -1686,7 +1706,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return err return false, err
} }
} }
@ -1702,14 +1722,14 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures // Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime, so we get better errors. // local to container runtime, so we get better errors.
return err return false, err
} }
} }
return nil return false, nil
} }
return nil return false, nil
} }
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method // syncTerminatingPod is expected to terminate all running containers in a pod. Once this method

View File

@ -915,6 +915,12 @@ func countRunningContainerStatus(status v1.PodStatus) int {
return runningContainers return runningContainers
} }
// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running
// containers. This returns false if the pod has not yet been started or the pod is unknown.
func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return kl.podWorkers.CouldHaveRunningContainers(pod.UID)
}
// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have // PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server. // been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
@ -1424,7 +1430,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
} }
// generateAPIPodStatus creates the final API pod status for a pod, given the // generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status. // internal pod status. This method should only be called from within sync*Pod methods.
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus { func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod)) klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod))

View File

@ -505,9 +505,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
var got bool var got bool
kubelet.podWorkers = &fakePodWorkers{ kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true got = true
return nil return false, nil
}, },
cache: kubelet.podCache, cache: kubelet.podCache,
t: t, t: t,
@ -584,9 +584,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
var got bool var got bool
kubelet.podWorkers = &fakePodWorkers{ kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true got = true
return nil return false, nil
}, },
cache: kubelet.podCache, cache: kubelet.podCache,
t: t, t: t,
@ -1300,8 +1300,11 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*v1.Pod{pod} pods := []*v1.Pod{pod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{}) isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err) assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName) assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName)
assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods()) assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods())
@ -1332,8 +1335,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*v1.Pod{pod, mirrorPod} pods := []*v1.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
assert.NoError(t, err) assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
name := kubecontainer.GetPodFullName(pod) name := kubecontainer.GetPodFullName(pod)
creates, deletes := manager.GetCounts(name) creates, deletes := manager.GetCounts(name)
if creates != 1 || deletes != 1 { if creates != 1 || deletes != 1 {
@ -1489,13 +1495,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
}) })
kubelet.podManager.SetPods([]*v1.Pod{pod}) kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error") assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
pod.Spec.HostNetwork = true pod.Spec.HostNetwork = true
err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error") assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
} }
func TestFilterOutInactivePods(t *testing.T) { func TestFilterOutInactivePods(t *testing.T) {

View File

@ -218,7 +218,7 @@ type PodWorkers interface {
} }
// the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod) // the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod)
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
// the function to invoke to terminate a pod (ensure no running processes are present) // the function to invoke to terminate a pod (ensure no running processes are present)
type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
} }
klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
var isTerminal bool
err := func() error { err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate // The worker is responsible for ensuring the sync method sees the appropriate
// status updates on resyncs (the result of the last sync), transitions to // status updates on resyncs (the result of the last sync), transitions to
@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn) err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)
default: default:
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
} }
lastSyncTime = time.Now() lastSyncTime = time.Now()
return err return err
}() }()
var phaseTransition bool
switch { switch {
case err == context.Canceled: case err == context.Canceled:
// when the context is cancelled we expect an update to already be queued // when the context is cancelled we expect an update to already be queued
@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
} }
// otherwise we move to the terminating phase // otherwise we move to the terminating phase
p.completeTerminating(pod) p.completeTerminating(pod)
phaseTransition = true
case isTerminal:
// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
p.completeSync(pod)
phaseTransition = true
} }
// queue a retry for errors if necessary, then put the next event in the channel if any // queue a retry if necessary, then put the next event in the channel if any
p.completeWork(pod, err) p.completeWork(pod, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() { if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
} }
@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc {
return nil return nil
} }
// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should
// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
// UpdatePod.
func (p *podWorkers) completeSync(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
status.terminatingAt = time.Now()
} else {
klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.startedTerminating = true
}
p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
WorkType: TerminatingPodWork,
Options: UpdatePodOptions{
Pod: pod,
},
}
}
// completeTerminating is invoked when syncTerminatingPod completes successfully, which means // completeTerminating is invoked when syncTerminatingPod completes successfully, which means
// no container is running, no container will be started in the future, and we are ready for // no container is running, no container will be started in the future, and we are ready for
// cleanup. This updates the termination state which prevents future syncs and will ensure // cleanup. This updates the termination state which prevents future syncs and will ensure
@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {
// completeWork requeues on error or the next sync interval and then immediately executes any pending // completeWork requeues on error or the next sync interval and then immediately executes any pending
// work. // work.
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) { func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
// Requeue the last update if the last sync returned error. // Requeue the last update if the last sync returned error.
switch { switch {
case phaseTransition:
p.workQueue.Enqueue(pod.UID, 0)
case syncErr == nil: case syncErr == nil:
// No error; requeue at the regular resync interval. // No error; requeue at the regular resync interval.
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))

View File

@ -18,7 +18,6 @@ package kubelet
import ( import (
"context" "context"
"flag"
"reflect" "reflect"
"strconv" "strconv"
"sync" "sync"
@ -31,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -48,6 +46,7 @@ type fakePodWorkers struct {
t TestingInterface t TestingInterface
triggeredDeletion []types.UID triggeredDeletion []types.UID
triggeredTerminal []types.UID
statusLock sync.Mutex statusLock sync.Mutex
running map[types.UID]bool running map[types.UID]bool
@ -79,9 +78,13 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
case kubetypes.SyncPodKill: case kubetypes.SyncPodKill:
f.triggeredDeletion = append(f.triggeredDeletion, uid) f.triggeredDeletion = append(f.triggeredDeletion, uid)
default: default:
if err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status); err != nil { isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status)
if err != nil {
f.t.Errorf("Unexpected error: %v", err) f.t.Errorf("Unexpected error: %v", err)
} }
if isTerminal {
f.triggeredTerminal = append(f.triggeredTerminal, uid)
}
} }
} }
@ -249,7 +252,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
fakeCache := containertest.NewFakeCache(fakeRuntime) fakeCache := containertest.NewFakeCache(fakeRuntime)
fakeQueue := &fakeQueue{} fakeQueue := &fakeQueue{}
w := newPodWorkers( w := newPodWorkers(
func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
func() { func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
@ -259,7 +262,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
updateType: updateType, updateType: updateType,
}) })
}() }()
return nil return false, nil
}, },
func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
func() { func() {
@ -530,9 +533,84 @@ func newUIDSet(uids ...types.UID) sets.String {
return set return set
} }
func init() { type terminalPhaseSync struct {
klog.InitFlags(nil) lock sync.Mutex
flag.Lookup("v").Value.Set("5") fn syncPodFnType
terminal sets.String
}
func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus)
if err != nil {
return false, err
}
if !isTerminal {
s.lock.Lock()
defer s.lock.Unlock()
isTerminal = s.terminal.Has(string(pod.UID))
}
return isTerminal, nil
}
func (s *terminalPhaseSync) SetTerminal(uid types.UID) {
s.lock.Lock()
defer s.lock.Unlock()
s.terminal.Insert(string(uid))
}
func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync {
return &terminalPhaseSync{
fn: fn,
terminal: sets.NewString(),
}
}
func TestTerminalPhaseTransition(t *testing.T) {
podWorkers, _ := createPodWorkers()
var channels WorkChannel
podWorkers.workerChannelFn = channels.Intercept
terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.syncPodFn)
podWorkers.syncPodFn = terminalPhaseSyncer.SyncPod
// start pod
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod running
pod1 := podWorkers.podSyncStatuses[types.UID("1")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
// send another update to the pod
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod still running
pod1 = podWorkers.podSyncStatuses[types.UID("1")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
// the next sync should result in a transition to terminal
terminalPhaseSyncer.SetTerminal(types.UID("1"))
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod terminating
pod1 = podWorkers.podSyncStatuses[types.UID("1")]
if !pod1.IsTerminationRequested() || !pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
} }
func TestStaticPodExclusion(t *testing.T) { func TestStaticPodExclusion(t *testing.T) {
@ -1203,15 +1281,15 @@ type simpleFakeKubelet struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
return nil return false, nil
} }
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
kl.wg.Done() kl.wg.Done()
return nil return false, nil
} }
func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {

View File

@ -112,9 +112,10 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results []
// runPod runs a single pod and wait until all containers are running. // runPod runs a single pod and wait until all containers are running.
func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error { func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
var isTerminal bool
delay := retryDelay delay := retryDelay
retry := 0 retry := 0
for { for !isTerminal {
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil { if err != nil {
return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
@ -131,7 +132,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { if isTerminal, err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
} }
if retry >= runOnceMaxRetries { if retry >= runOnceMaxRetries {
@ -143,6 +144,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
retry++ retry++
delay *= runOnceRetryDelayBackoff delay *= runOnceRetryDelayBackoff
} }
return nil
} }
// isPodRunning returns true if all containers of a manifest are running. // isPodRunning returns true if all containers of a manifest are running.

View File

@ -83,8 +83,10 @@ type PodStatusProvider interface {
// PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted. // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted.
type PodDeletionSafetyProvider interface { type PodDeletionSafetyProvider interface {
// A function which returns true if the pod can safely be deleted // PodResourcesAreReclaimed returns true if the pod can safely be deleted.
PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
// PodCouldHaveRunningContainers returns true if the pod could have running containers.
PodCouldHaveRunningContainers(pod *v1.Pod) bool
} }
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
@ -335,19 +337,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
oldStatus = &cachedStatus.status oldStatus = &cachedStatus.status
} }
status := *oldStatus.DeepCopy() status := *oldStatus.DeepCopy()
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].State.Terminated != nil { // once a pod has initialized, any missing status is treated as a failure
continue if hasPodInitialized(pod) {
} for i := range status.ContainerStatuses {
status.ContainerStatuses[i].State = v1.ContainerState{ if status.ContainerStatuses[i].State.Terminated != nil {
Terminated: &v1.ContainerStateTerminated{ continue
Reason: "ContainerStatusUnknown", }
Message: "The container could not be located when the pod was terminated", status.ContainerStatuses[i].State = v1.ContainerState{
ExitCode: 137, Terminated: &v1.ContainerStateTerminated{
}, Reason: "ContainerStatusUnknown",
Message: "The container could not be located when the pod was terminated",
ExitCode: 137,
},
}
} }
} }
for i := range status.InitContainerStatuses {
// all but the final suffix of init containers which have no evidence of a container start are
// marked as failed containers
for i := range initializedContainers(status.InitContainerStatuses) {
if status.InitContainerStatuses[i].State.Terminated != nil { if status.InitContainerStatuses[i].State.Terminated != nil {
continue continue
} }
@ -364,6 +373,49 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
m.updateStatusInternal(pod, status, true) m.updateStatusInternal(pod, status, true)
} }
// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which
// implies those containers should not be transitioned to terminated status.
func hasPodInitialized(pod *v1.Pod) bool {
// a pod without init containers is always initialized
if len(pod.Spec.InitContainers) == 0 {
return true
}
// if any container has ever moved out of waiting state, the pod has initialized
for _, status := range pod.Status.ContainerStatuses {
if status.LastTerminationState.Terminated != nil || status.State.Waiting == nil {
return true
}
}
// if the last init container has ever completed with a zero exit code, the pod is initialized
if l := len(pod.Status.InitContainerStatuses); l > 0 {
container := pod.Status.InitContainerStatuses[l-1]
if state := container.LastTerminationState; state.Terminated != nil && state.Terminated.ExitCode == 0 {
return true
}
if state := container.State; state.Terminated != nil && state.Terminated.ExitCode == 0 {
return true
}
}
// otherwise the pod has no record of being initialized
return false
}
// initializedContainers returns all status except for suffix of containers that are in Waiting
// state, which is the set of containers that have attempted to start at least once. If all containers
// are Watiing, the first container is always returned.
func initializedContainers(containers []v1.ContainerStatus) []v1.ContainerStatus {
for i := len(containers) - 1; i >= 0; i-- {
if containers[i].State.Waiting == nil || containers[i].LastTerminationState.Terminated != nil {
return containers[0 : i+1]
}
}
// always return at least one container
if len(containers) > 0 {
return containers[0:1]
}
return nil
}
// checkContainerStateTransition ensures that no container is trying to transition // checkContainerStateTransition ensures that no container is trying to transition
// from a terminated to non-terminated state, which is illegal and indicates a // from a terminated to non-terminated state, which is illegal and indicates a
// logical error in the kubelet. // logical error in the kubelet.
@ -619,8 +671,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return return
} }
oldStatus := pod.Status.DeepCopy() mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes)) klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes))
if err != nil { if err != nil {
@ -630,7 +683,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if unchanged { if unchanged {
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
} else { } else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status) klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod pod = newPod
} }
@ -771,25 +824,49 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
return status return status
} }
// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions // mergePodStatus merges oldPodStatus and newPodStatus to preserve where pod conditions
// not owned by kubelet is preserved from oldPodStatus // not owned by kubelet and to ensure terminal phase transition only happens after all
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus { // running containers have terminated. This method does not modify the old status.
podConditions := []v1.PodCondition{} func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus {
podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions))
for _, c := range oldPodStatus.Conditions { for _, c := range oldPodStatus.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) { if !kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c) podConditions = append(podConditions, c)
} }
} }
for _, c := range newPodStatus.Conditions { for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) { if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c) podConditions = append(podConditions, c)
} }
} }
newPodStatus.Conditions = podConditions newPodStatus.Conditions = podConditions
// Delay transitioning a pod to a terminal status unless the pod is actually terminal.
// The Kubelet should never transition a pod to terminal status that could have running
// containers and thus actively be leveraging exclusive resources. Note that resources
// like volumes are reconciled by a subsystem in the Kubelet and will converge if a new
// pod reuses an exclusive resource (unmount -> free -> mount), which means we do not
// need wait for those resources to be detached by the Kubelet. In general, resources
// the Kubelet exclusively owns must be released prior to a pod being reported terminal,
// while resources that have participanting components above the API use the pod's
// transition to a terminal phase (or full deletion) to release those resources.
if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) {
if couldHaveRunningContainers {
newPodStatus.Phase = oldPodStatus.Phase
newPodStatus.Reason = oldPodStatus.Reason
newPodStatus.Message = oldPodStatus.Message
}
}
return newPodStatus return newPodStatus
} }
// isPhaseTerminal returns true if the pod's phase is terminal.
func isPhaseTerminal(phase v1.PodPhase) bool {
return phase == v1.PodFailed || phase == v1.PodSucceeded
}
// NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
func NeedToReconcilePodReadiness(pod *v1.Pod) bool { func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
if len(pod.Spec.ReadinessGates) == 0 { if len(pod.Spec.ReadinessGates) == 0 {

View File

@ -25,6 +25,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -648,11 +649,14 @@ func TestTerminatePodWaiting(t *testing.T) {
t.Logf("we expect the container statuses to have changed to terminated") t.Logf("we expect the container statuses to have changed to terminated")
newStatus := expectPodStatus(t, syncer, testPod) newStatus := expectPodStatus(t, syncer, testPod)
for i := range newStatus.ContainerStatuses { for _, container := range newStatus.ContainerStatuses {
assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated") assert.False(t, container.State.Terminated == nil, "expected containers to be terminated")
} }
for i := range newStatus.InitContainerStatuses { for _, container := range newStatus.InitContainerStatuses[:2] {
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") assert.False(t, container.State.Terminated == nil, "expected init containers to be terminated")
}
for _, container := range newStatus.InitContainerStatuses[2:] {
assert.False(t, container.State.Waiting == nil, "expected init containers to be waiting")
} }
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}} expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
@ -662,8 +666,8 @@ func TestTerminatePodWaiting(t *testing.T) {
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
} }
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, expectUnknownState) { if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, expectUnknownState)) t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State))
} }
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState)) t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState))
@ -680,6 +684,308 @@ func TestTerminatePodWaiting(t *testing.T) {
assert.Equal(t, newStatus.Message, firstStatus.Message) assert.Equal(t, newStatus.Message, firstStatus.Message)
} }
func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod {
pod := getTestPod()
for i := 0; i < initContainers; i++ {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: fmt.Sprintf("init-%d", i),
})
}
for i := 0; i < containers; i++ {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: fmt.Sprintf("%d", i),
})
}
pod.Status.StartTime = &metav1.Time{Time: time.Unix(1, 0).UTC()}
for _, fn := range fns {
fn(pod)
}
return pod
}
expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
}
if state.Terminated.ExitCode != 137 || state.Terminated.Reason != "ContainerStatusUnknown" || len(state.Terminated.Message) == 0 {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
}
}
expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
}
if state.Terminated.ExitCode != exitCode {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
}
}
expectWaiting := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated != nil || state.Running != nil || state.Waiting == nil {
t.Fatalf("unexpected state: %#v", state)
}
}
testCases := []struct {
name string
pod *v1.Pod
updateFn func(*v1.Pod)
expectFn func(t *testing.T, status v1.PodStatus)
}{
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })},
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })},
{pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
}
})},
{
name: "last termination state set",
pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
Name: "0",
LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}},
State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}},
},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
container := status.ContainerStatuses[0]
if container.LastTerminationState.Terminated.ExitCode != 2 {
t.Fatalf("unexpected last state: %#v", container.LastTerminationState)
}
expectTerminatedUnknown(t, container.State)
},
},
{
name: "no previous state",
pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults the first init container",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults only the first init container",
pod: newPod(2, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectWaiting(t, status.InitContainerStatuses[1].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults gaps",
pod: newPod(4, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}},
{Name: "init-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 1)
expectWaiting(t, status.InitContainerStatuses[3].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "failed last container is uninitialized",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 1)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "successful last container is initialized",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 0)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "successful last previous container is initialized, and container state is overwritten",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{
Name: "init-2",
LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}},
State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}},
},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[2].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "running container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "evidence of terminated container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminated(t, status.ContainerStatuses[0].State, 0)
},
},
{
name: "evidence of previously terminated container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status)
copied := tc.pod.DeepCopy()
if tc.updateFn != nil {
tc.updateFn(copied)
}
expected := copied.DeepCopy()
syncer.TerminatePod(copied)
status := expectPodStatus(t, syncer, tc.pod.DeepCopy())
if tc.expectFn != nil {
tc.expectFn(t, status)
return
}
if !reflect.DeepEqual(expected.Status, status) {
diff := cmp.Diff(expected.Status, status)
if len(diff) == 0 {
t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status)
}
t.Fatalf("unexpected status: %s", diff)
}
})
}
}
func TestSetContainerReadiness(t *testing.T) { func TestSetContainerReadiness(t *testing.T) {
cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"}
cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"}
@ -957,6 +1263,7 @@ func TestDeletePods(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod) client := fake.NewSimpleClientset(pod)
m := newTestManager(client) m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true
m.podManager.AddPod(pod) m.podManager.AddPod(pod)
status := getRandomPodStatus() status := getRandomPodStatus()
now := metav1.Now() now := metav1.Now()
@ -966,6 +1273,22 @@ func TestDeletePods(t *testing.T) {
verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
} }
func TestDeletePodWhileReclaiming(t *testing.T) {
pod := getTestPod()
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false
m.podManager.AddPod(pod)
status := getRandomPodStatus()
now := metav1.Now()
status.StartTime = &now
m.SetPodStatus(pod, status)
t.Logf("Expect to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), patchAction()})
}
func TestDoNotDeleteMirrorPods(t *testing.T) { func TestDoNotDeleteMirrorPods(t *testing.T) {
staticPod := getTestPod() staticPod := getTestPod()
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
@ -1070,19 +1393,22 @@ func deleteAction() core.DeleteAction {
func TestMergePodStatus(t *testing.T) { func TestMergePodStatus(t *testing.T) {
useCases := []struct { useCases := []struct {
desc string desc string
oldPodStatus func(input v1.PodStatus) v1.PodStatus hasRunningContainers bool
newPodStatus func(input v1.PodStatus) v1.PodStatus oldPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
}{ }{
{ {
"no change", "no change",
false,
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
getPodStatus(), getPodStatus(),
}, },
{ {
"readiness changes", "readiness changes",
false,
func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse input.Conditions[0].Status = v1.ConditionFalse
@ -1105,6 +1431,7 @@ func TestMergePodStatus(t *testing.T) {
}, },
{ {
"additional pod condition", "additional pod condition",
false,
func(input v1.PodStatus) v1.PodStatus { func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{ input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"), Type: v1.PodConditionType("example.com/feature"),
@ -1134,6 +1461,7 @@ func TestMergePodStatus(t *testing.T) {
}, },
{ {
"additional pod condition and readiness changes", "additional pod condition and readiness changes",
false,
func(input v1.PodStatus) v1.PodStatus { func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{ input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"), Type: v1.PodConditionType("example.com/feature"),
@ -1166,6 +1494,7 @@ func TestMergePodStatus(t *testing.T) {
}, },
{ {
"additional pod condition changes", "additional pod condition changes",
false,
func(input v1.PodStatus) v1.PodStatus { func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{ input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"), Type: v1.PodConditionType("example.com/feature"),
@ -1199,13 +1528,77 @@ func TestMergePodStatus(t *testing.T) {
Message: "Message", Message: "Message",
}, },
}, },
{
"phase is transitioning to failed and no containers running",
false,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
input.Reason = "Unknown"
input.Message = "Message"
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Reason = "Evicted"
input.Message = "Was Evicted"
return input
},
v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Reason: "Evicted",
Message: "Was Evicted",
},
},
{
"phase is transitioning to failed and containers running",
true,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
input.Reason = "Unknown"
input.Message = "Message"
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Reason = "Evicted"
input.Message = "Was Evicted"
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Reason: "Unknown",
Message: "Message",
},
},
} }
for _, tc := range useCases { for _, tc := range useCases {
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus())) t.Run(tc.desc, func(t *testing.T) {
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers)
t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output) if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
} t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output))
}
})
} }
} }

View File

@ -16,13 +16,18 @@ limitations under the License.
package testing package testing
import "k8s.io/api/core/v1" import v1 "k8s.io/api/core/v1"
// FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test. // FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test.
type FakePodDeletionSafetyProvider struct{} type FakePodDeletionSafetyProvider struct {
Reclaimed bool
// PodResourcesAreReclaimed implements PodDeletionSafetyProvider. HasRunning bool
// Always reports that all pod resources are reclaimed. }
func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
return true func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
return f.Reclaimed
}
func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return f.HasRunning
} }

View File

@ -103,6 +103,20 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(po
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status)
} }
// PodCouldHaveRunningContainers mocks base method
func (m *MockPodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PodCouldHaveRunningContainers", pod)
ret0, _ := ret[0].(bool)
return ret0
}
// PodCouldHaveRunningContainers indicates an expected call of PodCouldHaveRunningContainers
func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContainers(pod interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod)
}
// MockManager is a mock of Manager interface // MockManager is a mock of Manager interface
type MockManager struct { type MockManager struct {
ctrl *gomock.Controller ctrl *gomock.Controller