From 1d518adb764a5f540ba12846888b603a8f88016d Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 31 Jan 2022 22:02:48 -0500 Subject: [PATCH] kubelet: Pod probes should be handled by pod worker The pod worker is the owner of when a container is running or not, and the start and stop of the probes for a given pod should be handled during the pod sync loop. This ensures that probes do not continue running even after eviction. Because the pod semantics allow lifecycle probes to shorten grace period, the probe is removed after the containers in a pod are terminated successfully. As an optimization, if the pod will have a very short grace period (0 or 1 seconds) we stop the probes immediately to reduce resource usage during eviction slightly. After this change, the probe manager is only called by the pod worker or by the reconcile loop. --- pkg/kubelet/kubelet.go | 22 +++++++++++++------ pkg/kubelet/kubelet_pods.go | 3 --- .../nodeshutdown_manager_linux.go | 3 --- pkg/kubelet/prober/prober_manager.go | 19 ++++++++++++++++ pkg/kubelet/prober/testing/fake_manager.go | 3 +++ 5 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 89eff1e5cc7..e5fe6c5e078 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1713,6 +1713,9 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) + // Ensure the pod is being probed + kl.probeManager.AddPod(pod) + // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) @@ -1766,10 +1769,16 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu kl.statusManager.SetPodStatus(pod, apiPodStatus) if gracePeriod != nil { + if *gracePeriod <= 1 { + // If we plan to terminate quickly, stop probes immediately, otherwise we will wait until the pod is completely done + kl.probeManager.RemovePod(pod) + } klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) } else { klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) } + kl.probeManager.StopLivenessAndStartup(pod) + p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err := kl.killPod(pod, p, gracePeriod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) @@ -1778,6 +1787,12 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu return err } + // Once the containers are stopped, we can stop probing for liveness and readiness. + // TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after + // the detection of a container shutdown or (for readiness) after the first failure. Tracked as + // https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing. + kl.probeManager.RemovePod(pod) + // Guard against consistency issues in KillPod implementations by checking that there are no // running containers. This method is invoked infrequently so this is effectively free and can // catch race conditions introduced by callers updating pod status out of order. @@ -2233,9 +2248,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } } @@ -2269,10 +2281,6 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { if err := kl.deletePod(pod); err != nil { klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) } - // TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing - // once the pod kill is acknowledged and during eviction) - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.RemovePod(pod) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index a6d11d2ae48..0cdf20d2ba5 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1205,9 +1205,6 @@ func (kl *Kubelet) HandlePodCleanups() error { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } return nil diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index 6974d82acf4..a9733b70210 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -364,9 +364,6 @@ func (m *managerImpl) processShutdownEvent() error { gracePeriodOverride := group.ShutdownGracePeriodSeconds - // Stop probes for the pod - m.probeManager.RemovePod(pod) - // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 37282748f65..d8295930383 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -57,6 +57,9 @@ type Manager interface { // pod created. AddPod(pod *v1.Pod) + // StopLivenessAndStartup handles stopping liveness and startup probes during termination. + StopLivenessAndStartup(pod *v1.Pod) + // RemovePod handles cleaning up the removed pod state, including terminating probe workers and // deleting cached results. RemovePod(pod *v1.Pod) @@ -195,6 +198,22 @@ func (m *manager) AddPod(pod *v1.Pod) { } } +func (m *manager) StopLivenessAndStartup(pod *v1.Pod) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + key := probeKey{podUID: pod.UID} + for _, c := range pod.Spec.Containers { + key.containerName = c.Name + for _, probeType := range [...]probeType{liveness, startup} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + worker.stop() + } + } + } +} + func (m *manager) RemovePod(pod *v1.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() diff --git a/pkg/kubelet/prober/testing/fake_manager.go b/pkg/kubelet/prober/testing/fake_manager.go index 41910382d38..500cfcd9962 100644 --- a/pkg/kubelet/prober/testing/fake_manager.go +++ b/pkg/kubelet/prober/testing/fake_manager.go @@ -33,6 +33,9 @@ func (FakeManager) AddPod(_ *v1.Pod) {} // RemovePod simulates removing a Pod. func (FakeManager) RemovePod(_ *v1.Pod) {} +// Simulated stopping liveness and startup probes. +func (FakeManager) StopLivenessAndStartup(_ *v1.Pod) {} + // CleanupPods simulates cleaning up Pods. func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {}