Merge pull request #108366 from smarterclayton/terminating_not_terminated

Delay writing a terminal phase until the pod is terminated
This commit is contained in:
Kubernetes Prow Robot 2022-03-17 08:29:21 -07:00 committed by GitHub
commit 9e50a332d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1241 additions and 356 deletions

View File

@ -1452,24 +1452,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
// syncPod is the transaction script for the sync of a single pod (setting up)
// a pod. The reverse (teardown) is handled in syncTerminatingPod and
// syncTerminatedPod. If syncPod exits without error, then the pod runtime
// state is in sync with the desired configuration state (pod is running).
// If syncPod exits with a transient error, the next invocation of syncPod
// is expected to make progress towards reaching the runtime state.
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If syncPod exits with a transient error, the next
// invocation of syncPod is expected to make progress towards reaching the
// runtime state. syncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will by
// syncTerminatingPod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
// updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
// pod - the pod that is being set up
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of syncPod
//
// The workflow is:
// * Kill the pod immediately if update type is SyncPodKill
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running due to soft admission
// * Stop the pod's containers if it should not be running due to soft
// admission
// * Ensure any background tracking for a runnable pod is started
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
@ -1483,10 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
}()
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by kubelet.
@ -1518,11 +1532,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
// If the pod should not be running, we request the pod's containers be stopped. This is not the same
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
// it later). Set the status and phase appropriately
@ -1572,13 +1592,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
return syncErr
return false, syncErr
}
// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}
// ensure the kubelet knows about referenced secrets or configmaps used by the pod
@ -1635,7 +1655,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
@ -1676,7 +1696,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return err
return false, err
}
// Volume manager will not mount volumes for terminating pods
@ -1686,7 +1706,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return err
return false, err
}
}
@ -1702,14 +1722,14 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime, so we get better errors.
return err
return false, err
}
}
return nil
return false, nil
}
return nil
return false, nil
}
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method

View File

@ -915,6 +915,12 @@ func countRunningContainerStatus(status v1.PodStatus) int {
return runningContainers
}
// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running
// containers. This returns false if the pod has not yet been started or the pod is unknown.
func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return kl.podWorkers.CouldHaveRunningContainers(pod.UID)
}
// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
@ -1424,7 +1430,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
}
// generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status.
// internal pod status. This method should only be called from within sync*Pod methods.
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod))

View File

@ -505,9 +505,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
kubelet := testKubelet.kubelet
var got bool
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true
return nil
return false, nil
},
cache: kubelet.podCache,
t: t,
@ -584,9 +584,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
kubelet := testKubelet.kubelet
var got bool
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true
return nil
return false, nil
},
cache: kubelet.podCache,
t: t,
@ -1300,8 +1300,11 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*v1.Pod{pod}
kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
podFullName := kubecontainer.GetPodFullName(pod)
assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName)
assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods())
@ -1332,8 +1335,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*v1.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
name := kubecontainer.GetPodFullName(pod)
creates, deletes := manager.GetCounts(name)
if creates != 1 || deletes != 1 {
@ -1489,13 +1495,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
})
kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
pod.Spec.HostNetwork = true
err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
}
func TestFilterOutInactivePods(t *testing.T) {

View File

@ -218,7 +218,7 @@ type PodWorkers interface {
}
// the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod)
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
// the function to invoke to terminate a pod (ensure no running processes are present)
type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
}
klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
// status updates on resyncs (the result of the last sync), transitions to
@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)
default:
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
}
lastSyncTime = time.Now()
return err
}()
var phaseTransition bool
switch {
case err == context.Canceled:
// when the context is cancelled we expect an update to already be queued
@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
}
// otherwise we move to the terminating phase
p.completeTerminating(pod)
phaseTransition = true
case isTerminal:
// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
p.completeSync(pod)
phaseTransition = true
}
// queue a retry for errors if necessary, then put the next event in the channel if any
p.completeWork(pod, err)
// queue a retry if necessary, then put the next event in the channel if any
p.completeWork(pod, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc {
return nil
}
// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should
// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
// UpdatePod.
func (p *podWorkers) completeSync(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
status.terminatingAt = time.Now()
} else {
klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.startedTerminating = true
}
p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
WorkType: TerminatingPodWork,
Options: UpdatePodOptions{
Pod: pod,
},
}
}
// completeTerminating is invoked when syncTerminatingPod completes successfully, which means
// no container is running, no container will be started in the future, and we are ready for
// cleanup. This updates the termination state which prevents future syncs and will ensure
@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {
// completeWork requeues on error or the next sync interval and then immediately executes any pending
// work.
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
// Requeue the last update if the last sync returned error.
switch {
case phaseTransition:
p.workQueue.Enqueue(pod.UID, 0)
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))

View File

@ -18,7 +18,6 @@ package kubelet
import (
"context"
"flag"
"reflect"
"strconv"
"sync"
@ -31,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -48,6 +46,7 @@ type fakePodWorkers struct {
t TestingInterface
triggeredDeletion []types.UID
triggeredTerminal []types.UID
statusLock sync.Mutex
running map[types.UID]bool
@ -79,9 +78,13 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
case kubetypes.SyncPodKill:
f.triggeredDeletion = append(f.triggeredDeletion, uid)
default:
if err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status); err != nil {
isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status)
if err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
if isTerminal {
f.triggeredTerminal = append(f.triggeredTerminal, uid)
}
}
}
@ -249,7 +252,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
fakeCache := containertest.NewFakeCache(fakeRuntime)
fakeQueue := &fakeQueue{}
w := newPodWorkers(
func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
func() {
lock.Lock()
defer lock.Unlock()
@ -259,7 +262,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
updateType: updateType,
})
}()
return nil
return false, nil
},
func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
func() {
@ -530,9 +533,84 @@ func newUIDSet(uids ...types.UID) sets.String {
return set
}
func init() {
klog.InitFlags(nil)
flag.Lookup("v").Value.Set("5")
type terminalPhaseSync struct {
lock sync.Mutex
fn syncPodFnType
terminal sets.String
}
func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus)
if err != nil {
return false, err
}
if !isTerminal {
s.lock.Lock()
defer s.lock.Unlock()
isTerminal = s.terminal.Has(string(pod.UID))
}
return isTerminal, nil
}
func (s *terminalPhaseSync) SetTerminal(uid types.UID) {
s.lock.Lock()
defer s.lock.Unlock()
s.terminal.Insert(string(uid))
}
func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync {
return &terminalPhaseSync{
fn: fn,
terminal: sets.NewString(),
}
}
func TestTerminalPhaseTransition(t *testing.T) {
podWorkers, _ := createPodWorkers()
var channels WorkChannel
podWorkers.workerChannelFn = channels.Intercept
terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.syncPodFn)
podWorkers.syncPodFn = terminalPhaseSyncer.SyncPod
// start pod
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod running
pod1 := podWorkers.podSyncStatuses[types.UID("1")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
// send another update to the pod
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod still running
pod1 = podWorkers.podSyncStatuses[types.UID("1")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
// the next sync should result in a transition to terminal
terminalPhaseSyncer.SetTerminal(types.UID("1"))
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe pod terminating
pod1 = podWorkers.podSyncStatuses[types.UID("1")]
if !pod1.IsTerminationRequested() || !pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
}
func TestStaticPodExclusion(t *testing.T) {
@ -1203,15 +1281,15 @@ type simpleFakeKubelet struct {
wg sync.WaitGroup
}
func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
func (kl *simpleFakeKubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
return nil
return false, nil
}
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
kl.wg.Done()
return nil
return false, nil
}
func (kl *simpleFakeKubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {

View File

@ -112,9 +112,10 @@ func (kl *Kubelet) runOnce(pods []*v1.Pod, retryDelay time.Duration) (results []
// runPod runs a single pod and wait until all containers are running.
func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
var isTerminal bool
delay := retryDelay
retry := 0
for {
for !isTerminal {
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
@ -131,7 +132,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
if isTerminal, err = kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
}
if retry >= runOnceMaxRetries {
@ -143,6 +144,7 @@ func (kl *Kubelet) runPod(pod *v1.Pod, retryDelay time.Duration) error {
retry++
delay *= runOnceRetryDelayBackoff
}
return nil
}
// isPodRunning returns true if all containers of a manifest are running.

View File

@ -83,8 +83,10 @@ type PodStatusProvider interface {
// PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted.
type PodDeletionSafetyProvider interface {
// A function which returns true if the pod can safely be deleted
// PodResourcesAreReclaimed returns true if the pod can safely be deleted.
PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
// PodCouldHaveRunningContainers returns true if the pod could have running containers.
PodCouldHaveRunningContainers(pod *v1.Pod) bool
}
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
@ -324,6 +326,14 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta
}
// TerminatePod ensures that the status of containers is properly defaulted at the end of the pod
// lifecycle. As the Kubelet must reconcile with the container runtime to observe container status
// there is always the possibility we are unable to retrieve one or more container statuses due to
// garbage collection, admin action, or loss of temporary data on a restart. This method ensures
// that any absent container status is treated as a failure so that we do not incorrectly describe
// the pod as successful. If we have not yet initialized the pod in the presence of init containers,
// the init container failure status is sufficient to describe the pod as failing, and we do not need
// to override waiting containers (unless there is evidence the pod previously started those containers).
func (m *manager) TerminatePod(pod *v1.Pod) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
@ -335,19 +345,26 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
oldStatus = &cachedStatus.status
}
status := *oldStatus.DeepCopy()
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].State.Terminated != nil {
continue
}
status.ContainerStatuses[i].State = v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Reason: "ContainerStatusUnknown",
Message: "The container could not be located when the pod was terminated",
ExitCode: 137,
},
// once a pod has initialized, any missing status is treated as a failure
if hasPodInitialized(pod) {
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].State.Terminated != nil {
continue
}
status.ContainerStatuses[i].State = v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Reason: "ContainerStatusUnknown",
Message: "The container could not be located when the pod was terminated",
ExitCode: 137,
},
}
}
}
for i := range status.InitContainerStatuses {
// all but the final suffix of init containers which have no evidence of a container start are
// marked as failed containers
for i := range initializedContainers(status.InitContainerStatuses) {
if status.InitContainerStatuses[i].State.Terminated != nil {
continue
}
@ -364,6 +381,49 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
m.updateStatusInternal(pod, status, true)
}
// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which
// implies those containers should not be transitioned to terminated status.
func hasPodInitialized(pod *v1.Pod) bool {
// a pod without init containers is always initialized
if len(pod.Spec.InitContainers) == 0 {
return true
}
// if any container has ever moved out of waiting state, the pod has initialized
for _, status := range pod.Status.ContainerStatuses {
if status.LastTerminationState.Terminated != nil || status.State.Waiting == nil {
return true
}
}
// if the last init container has ever completed with a zero exit code, the pod is initialized
if l := len(pod.Status.InitContainerStatuses); l > 0 {
container := pod.Status.InitContainerStatuses[l-1]
if state := container.LastTerminationState; state.Terminated != nil && state.Terminated.ExitCode == 0 {
return true
}
if state := container.State; state.Terminated != nil && state.Terminated.ExitCode == 0 {
return true
}
}
// otherwise the pod has no record of being initialized
return false
}
// initializedContainers returns all status except for suffix of containers that are in Waiting
// state, which is the set of containers that have attempted to start at least once. If all containers
// are Watiing, the first container is always returned.
func initializedContainers(containers []v1.ContainerStatus) []v1.ContainerStatus {
for i := len(containers) - 1; i >= 0; i-- {
if containers[i].State.Waiting == nil || containers[i].LastTerminationState.Terminated != nil {
return containers[0 : i+1]
}
}
// always return at least one container
if len(containers) > 0 {
return containers[0:1]
}
return nil
}
// checkContainerStateTransition ensures that no container is trying to transition
// from a terminated to non-terminated state, which is illegal and indicates a
// logical error in the kubelet.
@ -619,8 +679,9 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return
}
oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes))
if err != nil {
@ -630,7 +691,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if unchanged {
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
} else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", status.status)
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod
}
@ -771,25 +832,49 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
return status
}
// mergePodStatus merges oldPodStatus and newPodStatus where pod conditions
// not owned by kubelet is preserved from oldPodStatus
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus {
podConditions := []v1.PodCondition{}
// mergePodStatus merges oldPodStatus and newPodStatus to preserve where pod conditions
// not owned by kubelet and to ensure terminal phase transition only happens after all
// running containers have terminated. This method does not modify the old status.
func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningContainers bool) v1.PodStatus {
podConditions := make([]v1.PodCondition, 0, len(oldPodStatus.Conditions)+len(newPodStatus.Conditions))
for _, c := range oldPodStatus.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
for _, c := range newPodStatus.Conditions {
if kubetypes.PodConditionByKubelet(c.Type) {
podConditions = append(podConditions, c)
}
}
newPodStatus.Conditions = podConditions
// Delay transitioning a pod to a terminal status unless the pod is actually terminal.
// The Kubelet should never transition a pod to terminal status that could have running
// containers and thus actively be leveraging exclusive resources. Note that resources
// like volumes are reconciled by a subsystem in the Kubelet and will converge if a new
// pod reuses an exclusive resource (unmount -> free -> mount), which means we do not
// need wait for those resources to be detached by the Kubelet. In general, resources
// the Kubelet exclusively owns must be released prior to a pod being reported terminal,
// while resources that have participanting components above the API use the pod's
// transition to a terminal phase (or full deletion) to release those resources.
if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) {
if couldHaveRunningContainers {
newPodStatus.Phase = oldPodStatus.Phase
newPodStatus.Reason = oldPodStatus.Reason
newPodStatus.Message = oldPodStatus.Message
}
}
return newPodStatus
}
// isPhaseTerminal returns true if the pod's phase is terminal.
func isPhaseTerminal(phase v1.PodPhase) bool {
return phase == v1.PodFailed || phase == v1.PodSucceeded
}
// NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
if len(pod.Spec.ReadinessGates) == 0 {

View File

@ -25,6 +25,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
@ -648,11 +649,14 @@ func TestTerminatePodWaiting(t *testing.T) {
t.Logf("we expect the container statuses to have changed to terminated")
newStatus := expectPodStatus(t, syncer, testPod)
for i := range newStatus.ContainerStatuses {
assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated")
for _, container := range newStatus.ContainerStatuses {
assert.False(t, container.State.Terminated == nil, "expected containers to be terminated")
}
for i := range newStatus.InitContainerStatuses {
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated")
for _, container := range newStatus.InitContainerStatuses[:2] {
assert.False(t, container.State.Terminated == nil, "expected init containers to be terminated")
}
for _, container := range newStatus.InitContainerStatuses[2:] {
assert.False(t, container.State.Waiting == nil, "expected init containers to be waiting")
}
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
@ -662,8 +666,8 @@ func TestTerminatePodWaiting(t *testing.T) {
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, expectUnknownState) {
t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, expectUnknownState))
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
t.Errorf("waiting container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State))
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState))
@ -680,6 +684,308 @@ func TestTerminatePodWaiting(t *testing.T) {
assert.Equal(t, newStatus.Message, firstStatus.Message)
}
func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod {
pod := getTestPod()
for i := 0; i < initContainers; i++ {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: fmt.Sprintf("init-%d", i),
})
}
for i := 0; i < containers; i++ {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{
Name: fmt.Sprintf("%d", i),
})
}
pod.Status.StartTime = &metav1.Time{Time: time.Unix(1, 0).UTC()}
for _, fn := range fns {
fn(pod)
}
return pod
}
expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
}
if state.Terminated.ExitCode != 137 || state.Terminated.Reason != "ContainerStatusUnknown" || len(state.Terminated.Message) == 0 {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
}
}
expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
}
if state.Terminated.ExitCode != exitCode {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
}
}
expectWaiting := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated != nil || state.Running != nil || state.Waiting == nil {
t.Fatalf("unexpected state: %#v", state)
}
}
testCases := []struct {
name string
pod *v1.Pod
updateFn func(*v1.Pod)
expectFn func(t *testing.T, status v1.PodStatus)
}{
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed })},
{pod: newPod(0, 1, func(pod *v1.Pod) { pod.Status.Phase = v1.PodRunning })},
{pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
}
})},
{
name: "last termination state set",
pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
Name: "0",
LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}},
State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}},
},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
container := status.ContainerStatuses[0]
if container.LastTerminationState.Terminated.ExitCode != 2 {
t.Fatalf("unexpected last state: %#v", container.LastTerminationState)
}
expectTerminatedUnknown(t, container.State)
},
},
{
name: "no previous state",
pod: newPod(0, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults the first init container",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults only the first init container",
pod: newPod(2, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectWaiting(t, status.InitContainerStatuses[1].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "uninitialized pod defaults gaps",
pod: newPod(4, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}},
{Name: "init-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 1)
expectWaiting(t, status.InitContainerStatuses[3].State)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "failed last container is uninitialized",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 1}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 1)
expectWaiting(t, status.ContainerStatuses[0].State)
},
},
{
name: "successful last container is initialized",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminated(t, status.InitContainerStatuses[2].State, 0)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "successful last previous container is initialized, and container state is overwritten",
pod: newPod(3, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{Name: "init-1", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
{
Name: "init-2",
LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}},
State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}},
},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[1].State)
expectTerminatedUnknown(t, status.InitContainerStatuses[2].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "running container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
{
name: "evidence of terminated container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminated(t, status.ContainerStatuses[0].State, 0)
},
},
{
name: "evidence of previously terminated container proves initialization",
pod: newPod(1, 1, func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.Status.Phase = v1.PodRunning
pod.Status.InitContainerStatuses = []v1.ContainerStatus{
{Name: "init-0", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}},
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{Name: "0", LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ExitCode: 0}}},
}
}),
expectFn: func(t *testing.T, status v1.PodStatus) {
expectTerminatedUnknown(t, status.InitContainerStatuses[0].State)
expectTerminatedUnknown(t, status.ContainerStatuses[0].State)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status)
copied := tc.pod.DeepCopy()
if tc.updateFn != nil {
tc.updateFn(copied)
}
expected := copied.DeepCopy()
syncer.TerminatePod(copied)
status := expectPodStatus(t, syncer, tc.pod.DeepCopy())
if tc.expectFn != nil {
tc.expectFn(t, status)
return
}
if !reflect.DeepEqual(expected.Status, status) {
diff := cmp.Diff(expected.Status, status)
if len(diff) == 0 {
t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status)
}
t.Fatalf("unexpected status: %s", diff)
}
})
}
}
func TestSetContainerReadiness(t *testing.T) {
cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"}
cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"}
@ -957,6 +1263,7 @@ func TestDeletePods(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = true
m.podManager.AddPod(pod)
status := getRandomPodStatus()
now := metav1.Now()
@ -966,6 +1273,22 @@ func TestDeletePods(t *testing.T) {
verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
}
func TestDeletePodWhileReclaiming(t *testing.T) {
pod := getTestPod()
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podDeletionSafety.(*statustest.FakePodDeletionSafetyProvider).Reclaimed = false
m.podManager.AddPod(pod)
status := getRandomPodStatus()
now := metav1.Now()
status.StartTime = &now
m.SetPodStatus(pod, status)
t.Logf("Expect to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), patchAction()})
}
func TestDoNotDeleteMirrorPods(t *testing.T) {
staticPod := getTestPod()
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
@ -1070,19 +1393,22 @@ func deleteAction() core.DeleteAction {
func TestMergePodStatus(t *testing.T) {
useCases := []struct {
desc string
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
desc string
hasRunningContainers bool
oldPodStatus func(input v1.PodStatus) v1.PodStatus
newPodStatus func(input v1.PodStatus) v1.PodStatus
expectPodStatus v1.PodStatus
}{
{
"no change",
false,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus { return input },
getPodStatus(),
},
{
"readiness changes",
false,
func(input v1.PodStatus) v1.PodStatus { return input },
func(input v1.PodStatus) v1.PodStatus {
input.Conditions[0].Status = v1.ConditionFalse
@ -1105,6 +1431,7 @@ func TestMergePodStatus(t *testing.T) {
},
{
"additional pod condition",
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@ -1134,6 +1461,7 @@ func TestMergePodStatus(t *testing.T) {
},
{
"additional pod condition and readiness changes",
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@ -1166,6 +1494,7 @@ func TestMergePodStatus(t *testing.T) {
},
{
"additional pod condition changes",
false,
func(input v1.PodStatus) v1.PodStatus {
input.Conditions = append(input.Conditions, v1.PodCondition{
Type: v1.PodConditionType("example.com/feature"),
@ -1199,13 +1528,77 @@ func TestMergePodStatus(t *testing.T) {
Message: "Message",
},
},
{
"phase is transitioning to failed and no containers running",
false,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
input.Reason = "Unknown"
input.Message = "Message"
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Reason = "Evicted"
input.Message = "Was Evicted"
return input
},
v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Reason: "Evicted",
Message: "Was Evicted",
},
},
{
"phase is transitioning to failed and containers running",
true,
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodRunning
input.Reason = "Unknown"
input.Message = "Message"
return input
},
func(input v1.PodStatus) v1.PodStatus {
input.Phase = v1.PodFailed
input.Reason = "Evicted"
input.Message = "Was Evicted"
return input
},
v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
Reason: "Unknown",
Message: "Message",
},
},
}
for _, tc := range useCases {
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()))
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
t.Errorf("test case %q failed, expect: %+v, got %+v", tc.desc, tc.expectPodStatus, output)
}
t.Run(tc.desc, func(t *testing.T) {
output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers)
if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) {
t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output))
}
})
}
}

View File

@ -16,13 +16,18 @@ limitations under the License.
package testing
import "k8s.io/api/core/v1"
import v1 "k8s.io/api/core/v1"
// FakePodDeletionSafetyProvider is a fake PodDeletionSafetyProvider for test.
type FakePodDeletionSafetyProvider struct{}
// PodResourcesAreReclaimed implements PodDeletionSafetyProvider.
// Always reports that all pod resources are reclaimed.
func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
return true
type FakePodDeletionSafetyProvider struct {
Reclaimed bool
HasRunning bool
}
func (f *FakePodDeletionSafetyProvider) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
return f.Reclaimed
}
func (f *FakePodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return f.HasRunning
}

View File

@ -103,6 +103,20 @@ func (mr *MockPodDeletionSafetyProviderMockRecorder) PodResourcesAreReclaimed(po
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodResourcesAreReclaimed", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodResourcesAreReclaimed), pod, status)
}
// PodCouldHaveRunningContainers mocks base method
func (m *MockPodDeletionSafetyProvider) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PodCouldHaveRunningContainers", pod)
ret0, _ := ret[0].(bool)
return ret0
}
// PodCouldHaveRunningContainers indicates an expected call of PodCouldHaveRunningContainers
func (mr *MockPodDeletionSafetyProviderMockRecorder) PodCouldHaveRunningContainers(pod interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodCouldHaveRunningContainers", reflect.TypeOf((*MockPodDeletionSafetyProvider)(nil).PodCouldHaveRunningContainers), pod)
}
// MockManager is a mock of Manager interface
type MockManager struct {
ctrl *gomock.Controller

View File

@ -25,6 +25,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -35,6 +36,7 @@ import (
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
"k8s.io/kubernetes/test/e2e/scheduling"
"k8s.io/utils/pointer"
"github.com/onsi/ginkgo"
@ -45,6 +47,10 @@ var _ = SIGDescribe("Job", func() {
f := framework.NewDefaultFramework("job")
parallelism := int32(2)
completions := int32(4)
largeParallelism := int32(90)
largeCompletions := int32(90)
backoffLimit := int32(6) // default value
// Simplest case: N pods succeed
@ -361,6 +367,52 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectEqual(pod.Status.Phase, v1.PodFailed)
}
})
ginkgo.It("should run a job to completion with CPU requests [Serial]", func() {
ginkgo.By("Creating a job that with CPU requests")
testNodeName := scheduling.GetNodeThatCanRunPod(f)
targetNode, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), testNodeName, metav1.GetOptions{})
framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName)
cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU]
if !ok {
framework.Failf("Unable to get node's %q cpu", targetNode.Name)
}
cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value())))
backoff := 0
ginkgo.By("Creating a job")
job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, largeParallelism, largeCompletions, nil, int32(backoff))
for i := range job.Spec.Template.Spec.Containers {
job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(cpuRequest),
},
}
job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName}
}
framework.Logf("Creating job %q with a node hostname selector %q wth cpu request %q", job.Name, testNodeName, cpuRequest)
job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, largeCompletions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring pods for job exist")
pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
successes := int32(0)
for _, pod := range pods.Items {
if pod.Status.Phase == v1.PodSucceeded {
successes++
}
}
framework.ExpectEqual(successes, largeCompletions, "expected %d successful job pods, but got %d", largeCompletions, successes)
})
})
// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
@ -204,274 +205,19 @@ var _ = SIGDescribe("Pods Extended", func() {
ginkgo.It("should never report success for a pending container", func() {
ginkgo.By("creating pods that should always exit 1 and terminating the pod after a random delay")
var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`)
var (
lock sync.Mutex
errs []error
wg sync.WaitGroup
createAndTestPodRepeatedly(
3, 15,
podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000},
podClient.PodInterface,
)
})
ginkgo.It("should never report container start when an init container fails", func() {
ginkgo.By("creating pods with an init container that always exit 1 and terminating the pod after a random delay")
createAndTestPodRepeatedly(
3, 15,
podFastDeleteScenario{client: podClient.PodInterface, delayMs: 2000, initContainer: true},
podClient.PodInterface,
)
r := prometheus.NewRegistry()
h := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "start_latency",
Objectives: map[float64]float64{
0.5: 0.05,
0.75: 0.025,
0.9: 0.01,
0.99: 0.001,
},
}, []string{"node"})
r.MustRegister(h)
const delay = 2000
const workers = 3
const pods = 15
var min, max time.Duration
for i := 0; i < workers; i++ {
wg.Add(1)
go func(i int) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
for retries := 0; retries < pods; retries++ {
name := fmt.Sprintf("pod-submit-status-%d-%d", i, retries)
value := strconv.Itoa(time.Now().Nanosecond())
one := int64(1)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"name": "foo",
"time": value,
},
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
TerminationGracePeriodSeconds: &one,
Containers: []v1.Container{
{
Name: "busybox",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{
"/bin/false",
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
},
},
},
},
},
}
// create the pod, capture the change events, then delete the pod
start := time.Now()
created := podClient.Create(pod)
ch := make(chan []watch.Event)
waitForWatch := make(chan struct{})
go func() {
defer ginkgo.GinkgoRecover()
defer close(ch)
w, err := podClient.Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: created.ResourceVersion,
FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name),
})
if err != nil {
framework.Logf("Unable to watch pod %s: %v", pod.Name, err)
return
}
defer w.Stop()
close(waitForWatch)
events := []watch.Event{
{Type: watch.Added, Object: created},
}
for event := range w.ResultChan() {
events = append(events, event)
if event.Type == watch.Error {
framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object)
}
if event.Type == watch.Deleted {
framework.Logf("watch delete seen for %s", pod.Name)
break
}
}
ch <- events
}()
select {
case <-ch: // in case the goroutine above exits before establishing the watch
case <-waitForWatch: // when the watch is established
}
t := time.Duration(rand.Intn(delay)) * time.Millisecond
time.Sleep(t)
err := podClient.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "failed to delete pod")
var (
events []watch.Event
ok bool
)
select {
case events, ok = <-ch:
if !ok {
continue
}
if len(events) < 2 {
framework.Fail("only got a single event")
}
case <-time.After(5 * time.Minute):
framework.Failf("timed out waiting for watch events for %s", pod.Name)
}
end := time.Now()
// check the returned events for consistency
var duration, completeDuration time.Duration
var hasContainers, hasTerminated, hasTerminalPhase, hasRunningContainers bool
verifyFn := func(event watch.Event) error {
var ok bool
pod, ok = event.Object.(*v1.Pod)
if !ok {
framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object)
return nil
}
if len(pod.Status.InitContainerStatuses) != 0 {
return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses)
}
if len(pod.Status.ContainerStatuses) == 0 {
if hasContainers {
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
}
return nil
}
hasContainers = true
if len(pod.Status.ContainerStatuses) != 1 {
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
}
status := pod.Status.ContainerStatuses[0]
t := status.State.Terminated
if hasTerminated {
if status.State.Waiting != nil || status.State.Running != nil {
return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status)
}
if t == nil {
return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status)
}
}
var hasNoStartTime bool
hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil
if t != nil {
if !t.FinishedAt.Time.IsZero() {
if t.StartedAt.IsZero() {
hasNoStartTime = true
} else {
duration = t.FinishedAt.Sub(t.StartedAt.Time)
}
completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time)
}
defer func() { hasTerminated = true }()
switch {
case t.ExitCode == 1:
// expected
case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"):
// expected, pod was force-killed after grace period
case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message):
// pod volume teardown races with container start in CRI, which reports a failure
framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName)
default:
data, _ := json.MarshalIndent(pod.Status, "", " ")
framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data))
return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message)
}
switch {
case duration > time.Hour:
// problem with status reporting
return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, duration, t.StartedAt, t.FinishedAt)
case hasNoStartTime:
// should never happen
return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt)
}
}
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
hasTerminalPhase = true
} else {
if hasTerminalPhase {
return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status)
}
}
return nil
}
var eventErr error
for _, event := range events[1:] {
if err := verifyFn(event); err != nil {
eventErr = err
break
}
}
func() {
lock.Lock()
defer lock.Unlock()
if eventErr != nil {
errs = append(errs, eventErr)
return
}
if !hasTerminalPhase {
var names []string
for _, status := range pod.Status.ContainerStatuses {
if status.State.Running != nil {
names = append(names, status.Name)
}
}
switch {
case len(names) > 0:
errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ",")))
case pod.Status.Phase != v1.PodPending:
errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase))
}
}
if hasRunningContainers {
data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ")
errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data)))
}
}()
if duration < min {
min = duration
}
if duration > max || max == 0 {
max = duration
}
h.WithLabelValues(pod.Spec.NodeName).Observe(end.Sub(start).Seconds())
framework.Logf("Pod %s on node %s timings total=%s t=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, end.Sub(start), t, completeDuration, duration)
}
}(i)
}
wg.Wait()
if len(errs) > 0 {
var messages []string
for _, err := range errs {
messages = append(messages, err.Error())
}
framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n"))
}
values, _ := r.Gather()
var buf bytes.Buffer
for _, m := range values {
expfmt.MetricFamilyToText(&buf, m)
}
framework.Logf("Summary of latencies:\n%s", buf.String())
})
})
@ -552,3 +298,422 @@ var _ = SIGDescribe("Pods Extended", func() {
})
})
})
func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) {
var (
lock sync.Mutex
errs []error
wg sync.WaitGroup
)
r := prometheus.NewRegistry()
h := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "latency",
Objectives: map[float64]float64{
0.5: 0.05,
0.75: 0.025,
0.9: 0.01,
0.99: 0.001,
},
}, []string{"node"})
r.MustRegister(h)
for i := 0; i < workers; i++ {
wg.Add(1)
go func(i int) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
for retries := 0; retries < iterations; retries++ {
pod := scenario.Pod(i, retries)
// create the pod, capture the change events, then delete the pod
start := time.Now()
created, err := podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create pod")
ch := make(chan []watch.Event)
waitForWatch := make(chan struct{})
go func() {
defer ginkgo.GinkgoRecover()
defer close(ch)
w, err := podClient.Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: created.ResourceVersion,
FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name),
})
if err != nil {
framework.Logf("Unable to watch pod %s: %v", pod.Name, err)
return
}
defer w.Stop()
close(waitForWatch)
events := []watch.Event{
{Type: watch.Added, Object: created},
}
for event := range w.ResultChan() {
events = append(events, event)
if event.Type == watch.Error {
framework.Logf("watch error seen for %s: %#v", pod.Name, event.Object)
}
if scenario.IsLastEvent(event) {
framework.Logf("watch last event seen for %s", pod.Name)
break
}
}
ch <- events
}()
select {
case <-ch: // in case the goroutine above exits before establishing the watch
case <-waitForWatch: // when the watch is established
}
verifier, scenario, err := scenario.Action(pod)
framework.ExpectNoError(err, "failed to take action")
var (
events []watch.Event
ok bool
)
select {
case events, ok = <-ch:
if !ok {
continue
}
if len(events) < 2 {
framework.Fail("only got a single event")
}
case <-time.After(5 * time.Minute):
framework.Failf("timed out waiting for watch events for %s", pod.Name)
}
end := time.Now()
var eventErr error
for _, event := range events[1:] {
if err := verifier.Verify(event); err != nil {
eventErr = err
break
}
}
total := end.Sub(start)
var lastPod *v1.Pod = pod
func() {
lock.Lock()
defer lock.Unlock()
if eventErr != nil {
errs = append(errs, eventErr)
return
}
pod, verifyErrs := verifier.VerifyFinal(scenario, total)
if pod != nil {
lastPod = pod
}
errs = append(errs, verifyErrs...)
}()
h.WithLabelValues(lastPod.Spec.NodeName).Observe(total.Seconds())
}
}(i)
}
wg.Wait()
if len(errs) > 0 {
var messages []string
for _, err := range errs {
messages = append(messages, err.Error())
}
framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n"))
}
values, _ := r.Gather()
var buf bytes.Buffer
for _, m := range values {
expfmt.MetricFamilyToText(&buf, m)
}
framework.Logf("Summary of latencies:\n%s", buf.String())
}
type podScenario interface {
Pod(worker, attempt int) *v1.Pod
Action(*v1.Pod) (podScenarioVerifier, string, error)
IsLastEvent(event watch.Event) bool
}
type podScenarioVerifier interface {
Verify(event watch.Event) error
VerifyFinal(scenario string, duration time.Duration) (*v1.Pod, []error)
}
type podFastDeleteScenario struct {
client v1core.PodInterface
delayMs int
initContainer bool
}
func (s podFastDeleteScenario) Verifier(pod *v1.Pod) podScenarioVerifier {
return &podStartVerifier{}
}
func (s podFastDeleteScenario) IsLastEvent(event watch.Event) bool {
if event.Type == watch.Deleted {
return true
}
return false
}
func (s podFastDeleteScenario) Action(pod *v1.Pod) (podScenarioVerifier, string, error) {
t := time.Duration(rand.Intn(s.delayMs)) * time.Millisecond
scenario := fmt.Sprintf("t=%s", t)
time.Sleep(t)
return &podStartVerifier{pod: pod}, scenario, s.client.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}
func (s podFastDeleteScenario) Pod(worker, attempt int) *v1.Pod {
name := fmt.Sprintf("pod-terminate-status-%d-%d", worker, attempt)
value := strconv.Itoa(time.Now().Nanosecond())
one := int64(1)
if s.initContainer {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"name": "foo",
"time": value,
},
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
TerminationGracePeriodSeconds: &one,
InitContainers: []v1.Container{
{
Name: "fail",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{
"/bin/false",
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
},
},
},
},
Containers: []v1.Container{
{
Name: "blocked",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{
"/bin/true",
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
},
},
},
},
},
}
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"name": "foo",
"time": value,
},
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
TerminationGracePeriodSeconds: &one,
Containers: []v1.Container{
{
Name: "fail",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{
"/bin/false",
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5m"),
v1.ResourceMemory: resource.MustParse("10Mi"),
},
},
},
},
},
}
}
// podStartVerifier checks events for a given pod and looks for unexpected
// transitions. It assumes one container running to completion.
type podStartVerifier struct {
pod *v1.Pod
hasInitContainers bool
hasContainers bool
hasTerminated bool
hasRunningContainers bool
hasTerminalPhase bool
duration time.Duration
completeDuration time.Duration
}
var reBug88766 = regexp.MustCompile(`rootfs_linux.*kubernetes\.io~(secret|projected).*no such file or directory`)
// Verify takes successive watch events for a given pod and returns an error if the status is unexpected.
// This verifier works for any pod which has 0 init containers and 1 regular container.
func (v *podStartVerifier) Verify(event watch.Event) error {
var ok bool
pod, ok := event.Object.(*v1.Pod)
if !ok {
framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object)
return nil
}
v.pod = pod
if len(pod.Spec.InitContainers) > 0 {
if len(pod.Status.InitContainerStatuses) == 0 {
if v.hasInitContainers {
return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses)
}
return nil
}
v.hasInitContainers = true
if len(pod.Status.InitContainerStatuses) != 1 {
return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses)
}
} else {
if len(pod.Status.InitContainerStatuses) != 0 {
return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses)
}
}
if len(pod.Status.ContainerStatuses) == 0 {
if v.hasContainers {
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
}
return nil
}
v.hasContainers = true
if len(pod.Status.ContainerStatuses) != 1 {
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
}
if status := findContainerStatusInPod(pod, "blocked"); status != nil {
if (status.Started != nil && *status.Started == true) || status.LastTerminationState.Terminated != nil || status.State.Waiting == nil {
return fmt.Errorf("pod %s on node %s should not have started the blocked container: %#v", pod.Name, pod.Spec.NodeName, status)
}
}
status := findContainerStatusInPod(pod, "fail")
if status == nil {
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status)
}
t := status.State.Terminated
if v.hasTerminated {
if status.State.Waiting != nil || status.State.Running != nil {
return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status)
}
if t == nil {
return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status)
}
}
var hasNoStartTime bool
v.hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil
if t != nil {
if !t.FinishedAt.Time.IsZero() {
if t.StartedAt.IsZero() {
hasNoStartTime = true
} else {
v.duration = t.FinishedAt.Sub(t.StartedAt.Time)
}
v.completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time)
}
defer func() { v.hasTerminated = true }()
switch {
case t.ExitCode == 1:
// expected
case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"):
// expected, pod was force-killed after grace period
case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message):
// pod volume teardown races with container start in CRI, which reports a failure
framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766", pod.Name, pod.Spec.NodeName)
default:
data, _ := json.MarshalIndent(pod.Status, "", " ")
framework.Logf("pod %s on node %s had incorrect final status:\n%s", pod.Name, pod.Spec.NodeName, string(data))
return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message)
}
switch {
case v.duration > time.Hour:
// problem with status reporting
return fmt.Errorf("pod %s container %s on node %s had very long duration %s: start=%s end=%s", pod.Name, status.Name, pod.Spec.NodeName, v.duration, t.StartedAt, t.FinishedAt)
case hasNoStartTime:
// should never happen
return fmt.Errorf("pod %s container %s on node %s had finish time but not start time: end=%s", pod.Name, status.Name, pod.Spec.NodeName, t.FinishedAt)
}
}
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
v.hasTerminalPhase = true
} else {
if v.hasTerminalPhase {
return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status)
}
}
return nil
}
func (v *podStartVerifier) VerifyFinal(scenario string, total time.Duration) (*v1.Pod, []error) {
var errs []error
pod := v.pod
if !v.hasTerminalPhase {
var names []string
for _, status := range pod.Status.ContainerStatuses {
if status.State.Running != nil {
names = append(names, status.Name)
}
}
switch {
case len(names) > 0:
errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ",")))
case pod.Status.Phase != v1.PodPending:
errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase))
}
}
if v.hasRunningContainers {
data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ")
errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data)))
}
framework.Logf("Pod %s on node %s %s total=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, scenario, total, v.completeDuration, v.duration)
return pod, errs
}
// findContainerStatusInPod finds a container status by its name in the provided pod
func findContainerStatusInPod(pod *v1.Pod, containerName string) *v1.ContainerStatus {
for _, container := range pod.Status.InitContainerStatuses {
if container.Name == containerName {
return &container
}
}
for _, container := range pod.Status.ContainerStatuses {
if container.Name == containerName {
return &container
}
}
for _, container := range pod.Status.EphemeralContainerStatuses {
if container.Name == containerName {
return &container
}
}
return nil
}

View File

@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/kubectl/pkg/util/podutils"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
@ -128,6 +129,11 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut
framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected")
for _, pod := range list.Items {
if isPodStatusAffectedByIssue108594(&pod) {
framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status)
framework.Failf("failing test due to detecting invalid pod status")
}
if kubelettypes.IsCriticalPod(&pod) {
if isPodShutdown(&pod) {
framework.Logf("Expecting critical pod to be running, but it's not currently. Pod: %q, Pod Status %+v", pod.Name, pod.Status)
@ -155,6 +161,10 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut
framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected")
for _, pod := range list.Items {
if isPodStatusAffectedByIssue108594(&pod) {
framework.Logf("Detected invalid pod state for pod %q: pod status: %+v", pod.Name, pod.Status)
framework.Failf("failing test due to detecting invalid pod status")
}
if !isPodShutdown(&pod) {
framework.Logf("Expecting pod to be shutdown, but it's not currently: Pod: %q, Pod Status %+v", pod.Name, pod.Status)
return fmt.Errorf("pod should be shutdown, phase: %s", pod.Status.Phase)
@ -541,3 +551,8 @@ func isPodShutdown(pod *v1.Pod) bool {
return pod.Status.Message == podShutdownMessage && pod.Status.Reason == podShutdownReason && hasContainersNotReadyCondition && pod.Status.Phase == v1.PodFailed
}
// Pods should never report failed phase and have ready condition = true (https://github.com/kubernetes/kubernetes/issues/108594)
func isPodStatusAffectedByIssue108594(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed && podutils.IsPodReady(pod)
}