From 431e6a704406b27b910de5a2ed6f1437fcff4303 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 25 Jan 2021 17:51:12 +0100 Subject: [PATCH] Move readinessManager updates handling to kubelet --- pkg/kubelet/kubelet.go | 32 ++++++++++++------- pkg/kubelet/kubelet_test.go | 1 + .../kuberuntime/kuberuntime_manager.go | 7 ++-- pkg/kubelet/prober/common_test.go | 1 + pkg/kubelet/prober/prober_manager.go | 22 ++----------- pkg/kubelet/prober/prober_manager_test.go | 11 +++++-- test/e2e/common/node/container_probe.go | 28 ++++++---------- 7 files changed, 50 insertions(+), 52 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 58d72fc754e..d633d6e1d03 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -581,6 +581,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.livenessManager = proberesults.NewManager() + klet.readinessManager = proberesults.NewManager() klet.startupManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() @@ -618,6 +619,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, + klet.readinessManager, klet.startupManager, seccompProfileRoot, machineInfo, @@ -716,6 +718,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, + klet.readinessManager, klet.startupManager, klet.runner, kubeDeps.Recorder) @@ -924,8 +927,9 @@ type Kubelet struct { // Handles container probing. probeManager prober.Manager // Manages container health check results. - livenessManager proberesults.Manager - startupManager proberesults.Manager + livenessManager proberesults.Manager + readinessManager proberesults.Manager + startupManager proberesults.Manager // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -1438,7 +1442,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Start component sync loops. kl.statusManager.Start() - kl.probeManager.Start() // Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { @@ -1902,8 +1905,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // * plegCh: update the runtime cache; sync pod // * syncCh: sync all pods waiting for sync // * housekeepingCh: trigger cleanup of pods -// * liveness/startup manager: sync pods that have failed or in which one or more -// containers have failed liveness/startup checks +// * health manager: sync pods that have failed or in which one or more +// containers have failed health checks func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { @@ -1978,13 +1981,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle handler.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { - // The liveness manager detected a failure; sync the pod. - syncPod(kl, update, handler) + handleProbeSync(kl, update, handler, "liveness", "unhealthy") } + case update := <-kl.readinessManager.Updates(): + ready := update.Result == proberesults.Success + kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) + handleProbeSync(kl, update, handler, "readiness", map[bool]string{true: "ready", false: ""}[ready]) case update := <-kl.startupManager.Updates(): started := update.Result == proberesults.Success kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) - syncPod(kl, update, handler) + handleProbeSync(kl, update, handler, "startup", map[bool]string{true: "started", false: "unhealthy"}[started]) case <-housekeepingCh: if !kl.sourcesReady.AllReady() { // If the sources aren't ready or volume manager has not yet synced the states, @@ -2000,15 +2006,19 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle return true } -func syncPod(kl *Kubelet, update proberesults.Update, handler SyncHandler) { +func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { + probeAndStatus := probe + if len(status) > 0 { + probeAndStatus = fmt.Sprintf("%s (container %s)", probe, status) + } // We should not use the pod from manager, because it is never updated after initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. - klog.V(4).Infof("SyncLoop: ignore irrelevant update: %#v", update) + klog.V(4).Infof("SyncLoop %s: ignore irrelevant update: %#v", probeAndStatus, update) return } - klog.V(1).Infof("SyncLoop: %q", format.Pod(pod)) + klog.V(1).Infof("SyncLoop %s: %q", probeAndStatus, format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b6272dc8046..a32a50b1f29 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -241,6 +241,7 @@ func newTestKubeletWithImageList( kubelet.probeManager = probetest.FakeManager{} kubelet.livenessManager = proberesults.NewManager() + kubelet.readinessManager = proberesults.NewManager() kubelet.startupManager = proberesults.NewManager() fakeContainerManager := cm.NewFakeContainerManager() diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index c1fb52bba4b..abec903e2dc 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -101,8 +101,9 @@ type kubeGenericRuntimeManager struct { runtimeHelper kubecontainer.RuntimeHelper // Health check results. - livenessManager proberesults.Manager - startupManager proberesults.Manager + livenessManager proberesults.Manager + readinessManager proberesults.Manager + startupManager proberesults.Manager // If true, enforce container cpu limits with CFS quota support cpuCFSQuota bool @@ -159,6 +160,7 @@ type LegacyLogProvider interface { func NewKubeGenericRuntimeManager( recorder record.EventRecorder, livenessManager proberesults.Manager, + readinessManager proberesults.Manager, startupManager proberesults.Manager, seccompProfileRoot string, machineInfo *cadvisorapi.MachineInfo, @@ -187,6 +189,7 @@ func NewKubeGenericRuntimeManager( cpuCFSQuotaPeriod: cpuCFSQuotaPeriod, seccompProfileRoot: seccompProfileRoot, livenessManager: livenessManager, + readinessManager: readinessManager, startupManager: startupManager, machineInfo: machineInfo, osInterface: osInterface, diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index 5152bb1fcff..fb39e69037c 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -111,6 +111,7 @@ func newTestManager() *manager { status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}), results.NewManager(), results.NewManager(), + results.NewManager(), nil, // runner &record.FakeRecorder{}, ).(*manager) diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 1d829e62bd1..a661d7a1795 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -17,14 +17,13 @@ limitations under the License. package prober import ( - "k8s.io/apimachinery/pkg/util/clock" "sync" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics" "k8s.io/klog/v2" @@ -71,9 +70,6 @@ type Manager interface { // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each // container based on container running status, cached probe results and worker states. UpdatePodStatus(types.UID, *v1.PodStatus) - - // Start starts the Manager sync loops. - Start() } type manager struct { @@ -104,12 +100,12 @@ type manager struct { func NewManager( statusManager status.Manager, livenessManager results.Manager, + readinessManager results.Manager, startupManager results.Manager, runner kubecontainer.CommandRunner, recorder record.EventRecorder) Manager { prober := newProber(runner, recorder) - readinessManager := results.NewManager() return &manager{ statusManager: statusManager, prober: prober, @@ -121,12 +117,6 @@ func NewManager( } } -// Start syncing probe status. This should only be called once. -func (m *manager) Start() { - // Start syncing readiness. - go wait.Forever(m.updateReadiness, 0) -} - // Key uniquely identifying container probes type probeKey struct { podUID types.UID @@ -263,6 +253,7 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) { select { case w.manualTriggerCh <- struct{}{}: default: // Non-blocking. + klog.Warningf("Failed to trigger a manual run of %s probe", w.probeType.String()) } } } @@ -300,10 +291,3 @@ func (m *manager) workerCount() int { defer m.workerLock.RUnlock() return len(m.workers) } - -func (m *manager) updateReadiness() { - update := <-m.readinessManager.Updates() - - ready := update.Result == results.Success - m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) -} diff --git a/pkg/kubelet/prober/prober_manager_test.go b/pkg/kubelet/prober/prober_manager_test.go index cc9c16e769d..70b8c77a71a 100644 --- a/pkg/kubelet/prober/prober_manager_test.go +++ b/pkg/kubelet/prober/prober_manager_test.go @@ -319,6 +319,13 @@ func TestUpdatePodStatus(t *testing.T) { } } +func (m *manager) extractedReadinessHandling() { + update := <-m.readinessManager.Updates() + // This code corresponds to an extract from kubelet.syncLoopIteration() + ready := update.Result == results.Success + m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) +} + func TestUpdateReadiness(t *testing.T) { testPod := getTestPod() setTestProbe(testPod, readiness, v1.Probe{}) @@ -327,10 +334,10 @@ func TestUpdateReadiness(t *testing.T) { // Start syncing readiness without leaking goroutine. stopCh := make(chan struct{}) - go wait.Until(m.updateReadiness, 0, stopCh) + go wait.Until(m.extractedReadinessHandling, 0, stopCh) defer func() { close(stopCh) - // Send an update to exit updateReadiness() + // Send an update to exit extractedReadinessHandling() m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{}) }() diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 1c0c4cabf22..7f5d81ca2ff 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -373,27 +373,18 @@ var _ = SIGDescribe("Probing container", func() { Testname: Pod readiness probe, delayed by startup probe Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test. */ - ginkgo.It("should not be ready until startupProbe succeeds", func() { + ginkgo.It("should be ready immediately after startupProbe succeeds", func() { sleepBeforeStarted := time.Duration(45) - cmd := []string{"/bin/sh", "-c", fmt.Sprintf("echo ok >/tmp/health; sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} + cmd := []string{"/bin/sh", "-c", fmt.Sprintf("sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} readinessProbe := &v1.Probe{ - Handler: v1.Handler{ - Exec: &v1.ExecAction{ - Command: []string{"cat", "/tmp/health"}, - }, - }, + Handler: execHandler([]string{"/bin/true"}), InitialDelaySeconds: 0, PeriodSeconds: 60, } startupProbe := &v1.Probe{ - Handler: v1.Handler{ - Exec: &v1.ExecAction{ - Command: []string{"cat", "/tmp/startup"}, - }, - }, + Handler: execHandler([]string{"/bin/cat", "/tmp/startup"}), InitialDelaySeconds: 0, - PeriodSeconds: 1, - FailureThreshold: 600, + FailureThreshold: 60, } p := podClient.Create(startupPodSpec(startupProbe, readinessProbe, nil, cmd)) @@ -416,12 +407,13 @@ var _ = SIGDescribe("Probing container", func() { startedTime, err := GetContainerStartedTime(p, "busybox") framework.ExpectNoError(err) - framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime) - if readyTime.Sub(startedTime) < sleepBeforeStarted*time.Second { + readyIn := readyTime.Sub(startedTime) - sleepBeforeStarted*time.Second + framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn) + if readyIn < 0 { framework.Failf("Pod became ready before startupProbe succeeded") } - if readyTime.Sub(startedTime) > (sleepBeforeStarted+20)*time.Second { - framework.Failf("Pod became ready more than 20s after startupProbe succeeded") + if readyIn > 5*time.Second { + framework.Failf("Pod became ready in %v, more than 5s after startupProbe succeeded. It means that the delay readiness probes were not initiated immediately after startup finished.", readyIn) } }) })