From 6d98b0a0f4fbb9c1032f5c47fa20f59489301138 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 19 Oct 2019 18:10:11 -0400 Subject: [PATCH 1/5] Test that an always-fail container can't report the pod Succeeded The kubelet can race when a pod is deleted and report that a container succeeded when it instead failed, and thus the pod is reported as succeeded. Create an e2e test that demonstrates this failure. --- test/e2e/node/BUILD | 1 + test/e2e/node/pods.go | 229 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) diff --git a/test/e2e/node/BUILD b/test/e2e/node/BUILD index 6e833e1fd3e..23ac0858b6f 100644 --- a/test/e2e/node/BUILD +++ b/test/e2e/node/BUILD @@ -37,6 +37,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/job:go_default_library", diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index a16468312cd..4f2a023ee23 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -19,8 +19,13 @@ package node import ( "context" "encoding/json" + "fmt" + "math/rand" "net/http" + "regexp" "strconv" + "strings" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -29,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" imageutils "k8s.io/kubernetes/test/utils/image" @@ -197,4 +203,227 @@ var _ = SIGDescribe("Pods Extended", func() { framework.ExpectEqual(pod.Status.QOSClass, v1.PodQOSGuaranteed) }) }) + + framework.KubeDescribe("Pod Container Status", func() { + var podClient *framework.PodClient + ginkgo.BeforeEach(func() { + podClient = f.PodClient() + }) + + ginkgo.It("should never report success for a pending container", func() { + ginkgo.By("creating pods that should always exit 1 and terminating the pod after a random delay") + + var reBug88766 = regexp.MustCompile(`ContainerCannotRun.*rootfs_linux\.go.*kubernetes\.io~secret.*no such file or directory`) + + var ( + lock sync.Mutex + errs []error + + wg sync.WaitGroup + ) + + const delay = 2000 + const workers = 3 + const pods = 15 + var min, max time.Duration + for i := 0; i < workers; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for retries := 0; retries < pods; retries++ { + name := fmt.Sprintf("pod-submit-status-%d-%d", i, retries) + value := strconv.Itoa(time.Now().Nanosecond()) + one := int64(1) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "name": "foo", + "time": value, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &one, + Containers: []v1.Container{ + { + Name: "busybox", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "/bin/false", + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("5m"), + v1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }, + }, + }, + } + + // create the pod, capture the change events, then delete the pod + start := time.Now() + created := podClient.Create(pod) + ch := make(chan []watch.Event) + go func() { + defer close(ch) + w, err := podClient.Watch(context.TODO(), metav1.ListOptions{ + ResourceVersion: created.ResourceVersion, + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + framework.Logf("Unable to watch pod %s: %v", pod.Name, err) + return + } + defer w.Stop() + events := []watch.Event{ + {Type: watch.Added, Object: created}, + } + for event := range w.ResultChan() { + events = append(events, event) + if event.Type == watch.Deleted { + break + } + } + ch <- events + }() + + t := time.Duration(rand.Intn(delay)) * time.Millisecond + time.Sleep(t) + err := podClient.Delete(context.TODO(), pod.Name, nil) + framework.ExpectNoError(err, "failed to delete pod") + + events, ok := <-ch + if !ok { + continue + } + if len(events) < 2 { + framework.Fail("only got a single event") + } + + end := time.Now() + + // check the returned events for consistency + var duration, completeDuration time.Duration + var hasContainers, hasTerminated, hasTerminalPhase, hasRunningContainers bool + verifyFn := func(event watch.Event) error { + var ok bool + pod, ok = event.Object.(*v1.Pod) + if !ok { + framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object) + return nil + } + + if len(pod.Status.InitContainerStatuses) != 0 { + return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses) + } + if len(pod.Status.ContainerStatuses) == 0 { + if hasContainers { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + return nil + } + hasContainers = true + if len(pod.Status.ContainerStatuses) != 1 { + return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses) + } + status := pod.Status.ContainerStatuses[0] + t := status.State.Terminated + if hasTerminated { + if status.State.Waiting != nil || status.State.Running != nil { + return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status) + } + if t == nil { + return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status) + } + } + hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil + if t != nil { + if !t.FinishedAt.Time.IsZero() { + duration = t.FinishedAt.Sub(t.StartedAt.Time) + completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time) + } + + defer func() { hasTerminated = true }() + switch { + case t.ExitCode == 1: + // expected + case t.ExitCode == 128 && reBug88766.MatchString(t.Message): + // pod volume teardown races with container start in CRI, which reports a failure + framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766") + default: + return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message) + } + } + if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { + hasTerminalPhase = true + } else { + if hasTerminalPhase { + return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status) + } + } + return nil + } + + var eventErr error + for _, event := range events[1:] { + if err := verifyFn(event); err != nil { + eventErr = err + break + } + } + func() { + defer lock.Unlock() + lock.Lock() + + if eventErr != nil { + errs = append(errs, eventErr) + return + } + + if !hasTerminalPhase { + var names []string + for _, status := range pod.Status.ContainerStatuses { + if status.State.Terminated != nil || status.State.Running != nil { + names = append(names, status.Name) + } + } + switch { + case len(names) > 0: + errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ","))) + case pod.Status.Phase != v1.PodPending: + errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase)) + } + } + if hasRunningContainers { + data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ") + errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data))) + } + }() + + if duration < min { + min = duration + } + if duration > max || max == 0 { + max = duration + } + framework.Logf("Pod %s on node %s timings total=%s t=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, end.Sub(start), t, completeDuration, duration) + } + + }(i) + } + + wg.Wait() + + if len(errs) > 0 { + var messages []string + for _, err := range errs { + messages = append(messages, err.Error()) + } + framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n")) + } + }) + }) }) From ad3d8949f08766106c203e273b30c810994ebd99 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 14 May 2019 18:04:56 -0400 Subject: [PATCH 2/5] kubelet: Preserve existing container status when pod terminated The kubelet must not allow a container that was reported failed in a restartPolicy=Never pod to be reported to the apiserver as success. If a client deletes a restartPolicy=Never pod, the dispatchWork and status manager race to update the container status. When dispatchWork (specifically podIsTerminated) returns true, it means all containers are stopped, which means status in the container is accurate. However, the TerminatePod method then clears this status. This results in a pod that has been reported with status.phase=Failed getting reset to status.phase.Succeeded, which is a violation of the guarantees around terminal phase. Ensure the Kubelet never reports that a container succeeded when it hasn't run or been executed by guarding the terminate pod loop from ever reporting 0 in the absence of container status. --- pkg/kubelet/kubelet.go | 3 ++- pkg/kubelet/status/BUILD | 1 + pkg/kubelet/status/status_manager.go | 22 +++++++++++++-- pkg/kubelet/status/status_manager_test.go | 33 ++++++++++++++++++++++- 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2830f7c0709..c41098b94a8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2001,9 +2001,10 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle } // dispatchWork starts the asynchronous sync of the pod in a pod worker. -// If the pod is terminated, dispatchWork +// If the pod is terminated, dispatchWork will perform no action. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { if kl.podIsTerminated(pod) { + klog.V(4).Infof("Pod %q is terminated, ignoring remaining sync work: %s", format.Pod(pod), syncType) if pod.DeletionTimestamp != nil { // If the pod is in a terminated state, there is no pod worker to // handle the work item. Check if the DeletionTimestamp has been diff --git a/pkg/kubelet/status/BUILD b/pkg/kubelet/status/BUILD index 0b590440e58..314a4065555 100644 --- a/pkg/kubelet/status/BUILD +++ b/pkg/kubelet/status/BUILD @@ -54,6 +54,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index d216a31e576..f42aed3f070 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -314,21 +314,39 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta func (m *manager) TerminatePod(pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + + // ensure that all containers have a terminated state - because we do not know whether the container + // was successful, always report an error oldStatus := &pod.Status if cachedStatus, ok := m.podStatuses[pod.UID]; ok { oldStatus = &cachedStatus.status } status := *oldStatus.DeepCopy() for i := range status.ContainerStatuses { + if status.ContainerStatuses[i].State.Terminated != nil || status.ContainerStatuses[i].State.Waiting != nil { + continue + } status.ContainerStatuses[i].State = v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{}, + Terminated: &v1.ContainerStateTerminated{ + Reason: "ContainerStatusUnknown", + Message: "The container could not be located when the pod was terminated", + ExitCode: 137, + }, } } for i := range status.InitContainerStatuses { + if status.InitContainerStatuses[i].State.Terminated != nil || status.InitContainerStatuses[i].State.Waiting != nil { + continue + } status.InitContainerStatuses[i].State = v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{}, + Terminated: &v1.ContainerStateTerminated{ + Reason: "ContainerStatusUnknown", + Message: "The container could not be located when the pod was terminated", + ExitCode: 137, + }, } } + m.updateStatusInternal(pod, status, true) } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index f880a1e6947..eb3684ca87b 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -27,11 +27,12 @@ import ( "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -569,6 +570,16 @@ func TestTerminatePod(t *testing.T) { t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.") firstStatus := getRandomPodStatus() firstStatus.Phase = v1.PodFailed + firstStatus.InitContainerStatuses = []v1.ContainerStatus{ + {Name: "init-test-1"}, + {Name: "init-test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 0}}}, + {Name: "init-test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 3}}}, + } + firstStatus.ContainerStatuses = []v1.ContainerStatus{ + {Name: "test-1"}, + {Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, + {Name: "test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 0}}}, + } syncer.SetPodStatus(testPod, firstStatus) t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod") @@ -586,6 +597,26 @@ func TestTerminatePod(t *testing.T) { assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") } + expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}} + if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) { + t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[0].State, expectUnknownState)) + } + if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { + t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + } + if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { + t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + } + if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { + t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState)) + } + if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) { + t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + } + if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, firstStatus.ContainerStatuses[2].State) { + t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + } + t.Logf("we expect the previous status update to be preserved.") assert.Equal(t, newStatus.Phase, firstStatus.Phase) assert.Equal(t, newStatus.Message, firstStatus.Message) From 2364c10e2e4d9fc04e565f2e07f400e19b1bcc41 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 15 May 2019 11:30:23 -0700 Subject: [PATCH 3/5] kubelet: Don't delete pod until all container status is available After a pod reaches a terminal state and all containers are complete we can delete the pod from the API server. The dispatchWork method needs to wait for all container status to be available before invoking delete. Even after the worker stops, status updates will continue to be delivered and the sync handler will continue to sync the pods, so dispatchWork gets multiple opportunities to see status. The previous code assumed that a pod in Failed or Succeeded had no running containers, but eviction or deletion of running pods could still have running containers whose status needed to be reported. This modifies earlier test to guarantee that the "fallback" exit code 137 is never reported to match the expectation that all pods exit with valid status for all containers (unless some exceptional failure like eviction were to occur while the test is running). --- pkg/kubelet/kubelet.go | 23 +++++++++++++---------- pkg/kubelet/kubelet_pods.go | 28 +++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c41098b94a8..70be042df22 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2001,19 +2001,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle } // dispatchWork starts the asynchronous sync of the pod in a pod worker. -// If the pod is terminated, dispatchWork will perform no action. +// If the pod has completed termination, dispatchWork will perform no action. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { - if kl.podIsTerminated(pod) { - klog.V(4).Infof("Pod %q is terminated, ignoring remaining sync work: %s", format.Pod(pod), syncType) - if pod.DeletionTimestamp != nil { - // If the pod is in a terminated state, there is no pod worker to - // handle the work item. Check if the DeletionTimestamp has been - // set, and force a status update to trigger a pod deletion request - // to the apiserver. - kl.statusManager.TerminatePod(pod) - } + // check whether we are ready to delete the pod from the API server (all status up to date) + containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod) + if pod.DeletionTimestamp != nil && containersTerminal { + klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType) + kl.statusManager.TerminatePod(pod) return } + + // optimization: avoid invoking the pod worker if no further changes are possible to the pod definition + if podWorkerTerminal { + klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType) + return + } + // Run the sync in an async worker. kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 778050ee25b..99e35f32db0 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -865,8 +865,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret { return pullSecrets } -// podIsTerminated returns true if pod is in the terminated state ("Failed" or "Succeeded"). -func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool { +// podStatusIsTerminal reports when the specified pod has no running containers or is no longer accepting +// spec changes. +func (kl *Kubelet) podAndContainersAreTerminal(pod *v1.Pod) (containersTerminal, podWorkerTerminal bool) { // Check the cached pod status which was set after the last sync. status, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { @@ -875,11 +876,28 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool { // restarted. status = pod.Status } - return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses)) + // A pod transitions into failed or succeeded from either container lifecycle (RestartNever container + // fails) or due to external events like deletion or eviction. A terminal pod *should* have no running + // containers, but to know that the pod has completed its lifecycle you must wait for containers to also + // be terminal. + containersTerminal = notRunning(status.ContainerStatuses) + // The kubelet must accept config changes from the pod spec until it has reached a point where changes would + // have no effect on any running container. + podWorkerTerminal = status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && containersTerminal) + return } -// IsPodTerminated returns true if the pod with the provided UID is in a terminated state ("Failed" or "Succeeded") -// or if the pod has been deleted or removed +// podIsTerminated returns true if the provided pod is in a terminal phase ("Failed", "Succeeded") or +// has been deleted and has no running containers. This corresponds to when a pod must accept changes to +// its pod spec (e.g. terminating containers allow grace period to be shortened). +func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool { + _, podWorkerTerminal := kl.podAndContainersAreTerminal(pod) + return podWorkerTerminal +} + +// IsPodTerminated returns true if the pod with the provided UID is in a terminal phase ("Failed", +// "Succeeded") or has been deleted and has no running containers. This corresponds to when a pod must +// accept changes to its pod spec (e.g. terminating containers allow grace period to be shortened) func (kl *Kubelet) IsPodTerminated(uid types.UID) bool { pod, podFound := kl.podManager.GetPodByUID(uid) if !podFound { From 8722c834e5e52e241c0227e8673837c9081c1423 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 24 Feb 2020 19:34:49 -0500 Subject: [PATCH 4/5] kubelet: Never restart containers in deleting pods When constructing the API status of a pod, if the pod is marked for deletion no containers should be started. Previously, if a container inside of a terminating pod failed to start due to a container runtime error (that populates reasonCache) the reasonCache would remain populated (it is only updated by syncPod for non-terminating pods) and the delete action on the pod would be delayed until the reasonCache entry expired due to other pods. This dramatically reduces the amount of time the Kubelet waits to delete pods that are terminating and encountered a container runtime error. --- pkg/kubelet/container/helpers.go | 4 ++++ pkg/kubelet/container/helpers_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index ae74305f093..b15be2d7500 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -61,6 +61,10 @@ type RuntimeHelper interface { // ShouldContainerBeRestarted checks whether a container needs to be restarted. // TODO(yifan): Think about how to refactor this. func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool { + // Once a pod has been marked deleted, it should not be restarted + if pod.DeletionTimestamp != nil { + return false + } // Get latest container status. status := podStatus.FindContainerStatusByName(container.Name) // If the container was never started before, we should start it. diff --git a/pkg/kubelet/container/helpers_test.go b/pkg/kubelet/container/helpers_test.go index 180e8632936..b3cd733536a 100644 --- a/pkg/kubelet/container/helpers_test.go +++ b/pkg/kubelet/container/helpers_test.go @@ -19,6 +19,7 @@ package container import ( "reflect" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -449,6 +450,8 @@ func TestShouldContainerBeRestarted(t *testing.T) { v1.RestartPolicyOnFailure, v1.RestartPolicyAlways, } + + // test policies expected := map[string][]bool{ "no-history": {true, true, true}, "alive": {false, false, false}, @@ -467,6 +470,27 @@ func TestShouldContainerBeRestarted(t *testing.T) { } } } + + // test deleted pod + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + expected = map[string][]bool{ + "no-history": {false, false, false}, + "alive": {false, false, false}, + "succeed": {false, false, false}, + "failed": {false, false, false}, + "unknown": {false, false, false}, + } + for _, c := range pod.Spec.Containers { + for i, policy := range policies { + pod.Spec.RestartPolicy = policy + e := expected[c.Name][i] + r := ShouldContainerBeRestarted(&c, pod, podStatus) + if r != e { + t.Errorf("Restart for container %q with restart policy %q expected %t, got %t", + c.Name, policy, e, r) + } + } + } } func TestHasPrivilegedContainer(t *testing.T) { From 8bc5cb01a9c1e7698ddd641963da08d09410dcfc Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 25 Feb 2020 22:24:28 -0500 Subject: [PATCH 5/5] kubelet: Clear the podStatusChannel before invoking syncBatch The status manager syncBatch() method processes the current state of the cache, which should include all entries in the channel. Flush the channel before we call a batch to avoid unnecessary work and to unblock pod workers when the node is congested. Discovered while investigating long shutdown intervals on the node where the status channel stayed full for tens of seconds. Add a for loop around the select statement to avoid unnecessary invocations of the wait.Forever closure each time. --- pkg/kubelet/status/status_manager.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index f42aed3f070..92d00d495df 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -160,13 +160,20 @@ func (m *manager) Start() { syncTicker := time.Tick(syncPeriod) // syncPod and syncBatch share the same go routine to avoid sync races. go wait.Forever(func() { - select { - case syncRequest := <-m.podStatusChannel: - klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", - syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) - m.syncPod(syncRequest.podUID, syncRequest.status) - case <-syncTicker: - m.syncBatch() + for { + select { + case syncRequest := <-m.podStatusChannel: + klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", + syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) + m.syncPod(syncRequest.podUID, syncRequest.status) + case <-syncTicker: + klog.V(5).Infof("Status Manager: syncing batch") + // remove any entries in the status channel since the batch will handle them + for i := len(m.podStatusChannel); i > 0; i-- { + <-m.podStatusChannel + } + m.syncBatch() + } } }, 0) }