Merge pull request #98376 from matthyx/mega

Make all health checks probing consistent
This commit is contained in:
Kubernetes Prow Robot 2021-03-06 11:45:41 -08:00 committed by GitHub
commit c193c1b234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 118 additions and 78 deletions

View File

@ -591,6 +591,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager() klet.livenessManager = proberesults.NewManager()
klet.readinessManager = proberesults.NewManager()
klet.startupManager = proberesults.NewManager() klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache() klet.podCache = kubecontainer.NewCache()
@ -628,6 +629,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
runtime, err := kuberuntime.NewKubeGenericRuntimeManager( runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder), kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager, klet.livenessManager,
klet.readinessManager,
klet.startupManager, klet.startupManager,
seccompProfileRoot, seccompProfileRoot,
machineInfo, machineInfo,
@ -726,6 +728,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.probeManager = prober.NewManager( klet.probeManager = prober.NewManager(
klet.statusManager, klet.statusManager,
klet.livenessManager, klet.livenessManager,
klet.readinessManager,
klet.startupManager, klet.startupManager,
klet.runner, klet.runner,
kubeDeps.Recorder) kubeDeps.Recorder)
@ -934,8 +937,9 @@ type Kubelet struct {
// Handles container probing. // Handles container probing.
probeManager prober.Manager probeManager prober.Manager
// Manages container health check results. // Manages container health check results.
livenessManager proberesults.Manager livenessManager proberesults.Manager
startupManager proberesults.Manager readinessManager proberesults.Manager
startupManager proberesults.Manager
// How long to keep idle streaming command execution/port forwarding // How long to keep idle streaming command execution/port forwarding
// connections open before terminating them // connections open before terminating them
@ -1448,7 +1452,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start component sync loops. // Start component sync loops.
kl.statusManager.Start() kl.statusManager.Start()
kl.probeManager.Start()
// Start syncing RuntimeClasses if enabled. // Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil { if kl.runtimeClassManager != nil {
@ -1912,8 +1915,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
// * plegCh: update the runtime cache; sync pod // * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync // * syncCh: sync all pods waiting for sync
// * housekeepingCh: trigger cleanup of pods // * housekeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more // * health manager: sync pods that have failed or in which one or more
// containers have failed liveness checks // containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select { select {
@ -1988,19 +1991,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodSyncs(podsToSync) handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates(): case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure { if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod. handleProbeSync(kl, update, handler, "liveness", "unhealthy")
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
} }
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
handleProbeSync(kl, update, handler, "readiness", map[bool]string{true: "ready", false: ""}[ready])
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
handleProbeSync(kl, update, handler, "startup", map[bool]string{true: "started", false: "unhealthy"}[started])
case <-housekeepingCh: case <-housekeepingCh:
if !kl.sourcesReady.AllReady() { if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states, // If the sources aren't ready or volume manager has not yet synced the states,
@ -2016,6 +2016,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
return true return true
} }
func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
probeAndStatus := probe
if len(status) > 0 {
probeAndStatus = fmt.Sprintf("%s (container %s)", probe, status)
}
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop %s: ignore irrelevant update: %#v", probeAndStatus, update)
return
}
klog.V(1).Infof("SyncLoop %s: %q", probeAndStatus, format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action. // If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {

View File

@ -242,6 +242,7 @@ func newTestKubeletWithImageList(
kubelet.probeManager = probetest.FakeManager{} kubelet.probeManager = probetest.FakeManager{}
kubelet.livenessManager = proberesults.NewManager() kubelet.livenessManager = proberesults.NewManager()
kubelet.readinessManager = proberesults.NewManager()
kubelet.startupManager = proberesults.NewManager() kubelet.startupManager = proberesults.NewManager()
fakeContainerManager := cm.NewFakeContainerManager() fakeContainerManager := cm.NewFakeContainerManager()

View File

@ -101,8 +101,9 @@ type kubeGenericRuntimeManager struct {
runtimeHelper kubecontainer.RuntimeHelper runtimeHelper kubecontainer.RuntimeHelper
// Health check results. // Health check results.
livenessManager proberesults.Manager livenessManager proberesults.Manager
startupManager proberesults.Manager readinessManager proberesults.Manager
startupManager proberesults.Manager
// If true, enforce container cpu limits with CFS quota support // If true, enforce container cpu limits with CFS quota support
cpuCFSQuota bool cpuCFSQuota bool
@ -159,6 +160,7 @@ type LegacyLogProvider interface {
func NewKubeGenericRuntimeManager( func NewKubeGenericRuntimeManager(
recorder record.EventRecorder, recorder record.EventRecorder,
livenessManager proberesults.Manager, livenessManager proberesults.Manager,
readinessManager proberesults.Manager,
startupManager proberesults.Manager, startupManager proberesults.Manager,
seccompProfileRoot string, seccompProfileRoot string,
machineInfo *cadvisorapi.MachineInfo, machineInfo *cadvisorapi.MachineInfo,
@ -187,6 +189,7 @@ func NewKubeGenericRuntimeManager(
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod, cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: seccompProfileRoot, seccompProfileRoot: seccompProfileRoot,
livenessManager: livenessManager, livenessManager: livenessManager,
readinessManager: readinessManager,
startupManager: startupManager, startupManager: startupManager,
machineInfo: machineInfo, machineInfo: machineInfo,
osInterface: osInterface, osInterface: osInterface,

View File

@ -111,6 +111,7 @@ func newTestManager() *manager {
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}), status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),
results.NewManager(),
nil, // runner nil, // runner
&record.FakeRecorder{}, &record.FakeRecorder{},
).(*manager) ).(*manager)

View File

@ -18,11 +18,12 @@ package prober
import ( import (
"sync" "sync"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics" "k8s.io/component-base/metrics"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -69,9 +70,6 @@ type Manager interface {
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states. // container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *v1.PodStatus) UpdatePodStatus(types.UID, *v1.PodStatus)
// Start starts the Manager sync loops.
Start()
} }
type manager struct { type manager struct {
@ -94,18 +92,20 @@ type manager struct {
// prober executes the probe actions. // prober executes the probe actions.
prober *prober prober *prober
start time.Time
} }
// NewManager creates a Manager for pod probing. // NewManager creates a Manager for pod probing.
func NewManager( func NewManager(
statusManager status.Manager, statusManager status.Manager,
livenessManager results.Manager, livenessManager results.Manager,
readinessManager results.Manager,
startupManager results.Manager, startupManager results.Manager,
runner kubecontainer.CommandRunner, runner kubecontainer.CommandRunner,
recorder record.EventRecorder) Manager { recorder record.EventRecorder) Manager {
prober := newProber(runner, recorder) prober := newProber(runner, recorder)
readinessManager := results.NewManager()
return &manager{ return &manager{
statusManager: statusManager, statusManager: statusManager,
prober: prober, prober: prober,
@ -113,17 +113,10 @@ func NewManager(
livenessManager: livenessManager, livenessManager: livenessManager,
startupManager: startupManager, startupManager: startupManager,
workers: make(map[probeKey]*worker), workers: make(map[probeKey]*worker),
start: clock.RealClock{}.Now(),
} }
} }
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
// Key uniquely identifying container probes // Key uniquely identifying container probes
type probeKey struct { type probeKey struct {
podUID types.UID podUID types.UID
@ -253,8 +246,16 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
ready = result == results.Success ready = result == results.Success
} else { } else {
// The check whether there is a probe which hasn't run yet. // The check whether there is a probe which hasn't run yet.
_, exists := m.getWorker(podUID, c.Name, readiness) w, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists ready = !exists // no readinessProbe -> always ready
if exists {
// Trigger an immediate run of the readinessProbe to update ready state
select {
case w.manualTriggerCh <- struct{}{}:
default: // Non-blocking.
klog.Warningf("Failed to trigger a manual run of %s probe", w.probeType.String())
}
}
} }
podStatus.ContainerStatuses[i].Ready = ready podStatus.ContainerStatuses[i].Ready = ready
} }
@ -290,17 +291,3 @@ func (m *manager) workerCount() int {
defer m.workerLock.RUnlock() defer m.workerLock.RUnlock()
return len(m.workers) return len(m.workers)
} }
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
func (m *manager) updateStartup() {
update := <-m.startupManager.Updates()
started := update.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}

View File

@ -319,6 +319,13 @@ func TestUpdatePodStatus(t *testing.T) {
} }
} }
func (m *manager) extractedReadinessHandling() {
update := <-m.readinessManager.Updates()
// This code corresponds to an extract from kubelet.syncLoopIteration()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
func TestUpdateReadiness(t *testing.T) { func TestUpdateReadiness(t *testing.T) {
testPod := getTestPod() testPod := getTestPod()
setTestProbe(testPod, readiness, v1.Probe{}) setTestProbe(testPod, readiness, v1.Probe{})
@ -327,10 +334,10 @@ func TestUpdateReadiness(t *testing.T) {
// Start syncing readiness without leaking goroutine. // Start syncing readiness without leaking goroutine.
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go wait.Until(m.updateReadiness, 0, stopCh) go wait.Until(m.extractedReadinessHandling, 0, stopCh)
defer func() { defer func() {
close(stopCh) close(stopCh)
// Send an update to exit updateReadiness() // Send an update to exit extractedReadinessHandling()
m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{}) m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
}() }()

View File

@ -38,6 +38,9 @@ type worker struct {
// Channel for stopping the probe. // Channel for stopping the probe.
stopCh chan struct{} stopCh chan struct{}
// Channel for triggering the probe manually.
manualTriggerCh chan struct{}
// The pod containing this probe (read-only) // The pod containing this probe (read-only)
pod *v1.Pod pod *v1.Pod
@ -82,11 +85,12 @@ func newWorker(
container v1.Container) *worker { container v1.Container) *worker {
w := &worker{ w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod, manualTriggerCh: make(chan struct{}, 1), // Buffer so prober_manager can do non-blocking calls to doProbe.
container: container, pod: pod,
probeType: probeType, container: container,
probeManager: m, probeType: probeType,
probeManager: m,
} }
switch probeType { switch probeType {
@ -130,7 +134,10 @@ func (w *worker) run() {
// If kubelet restarted the probes could be started in rapid succession. // If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing. // Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) // Do it only if the kubelet has started recently.
if probeTickerPeriod > time.Since(w.probeManager.start) {
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
}
probeTicker := time.NewTicker(probeTickerPeriod) probeTicker := time.NewTicker(probeTickerPeriod)
@ -154,6 +161,7 @@ probeLoop:
case <-w.stopCh: case <-w.stopCh:
break probeLoop break probeLoop
case <-probeTicker.C: case <-probeTicker.C:
case <-w.manualTriggerCh:
// continue // continue
} }
} }

View File

@ -373,22 +373,15 @@ var _ = SIGDescribe("Probing container", func() {
Testname: Pod readiness probe, delayed by startup probe Testname: Pod readiness probe, delayed by startup probe
Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test. Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test.
*/ */
ginkgo.It("should not be ready until startupProbe succeeds", func() { ginkgo.It("should be ready immediately after startupProbe succeeds", func() {
cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 45; echo ok >/tmp/startup; sleep 600"} cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 10; echo ok >/tmp/startup; sleep 600"}
readinessProbe := &v1.Probe{ readinessProbe := &v1.Probe{
Handler: v1.Handler{ Handler: execHandler([]string{"/bin/cat", "/tmp/health"}),
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/health"},
},
},
InitialDelaySeconds: 0, InitialDelaySeconds: 0,
PeriodSeconds: 60,
} }
startupProbe := &v1.Probe{ startupProbe := &v1.Probe{
Handler: v1.Handler{ Handler: execHandler([]string{"/bin/cat", "/tmp/startup"}),
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/startup"},
},
},
InitialDelaySeconds: 0, InitialDelaySeconds: 0,
FailureThreshold: 60, FailureThreshold: 60,
} }
@ -397,7 +390,15 @@ var _ = SIGDescribe("Probing container", func() {
p, err := podClient.Get(context.TODO(), p.Name, metav1.GetOptions{}) p, err := podClient.Get(context.TODO(), p.Name, metav1.GetOptions{})
framework.ExpectNoError(err) framework.ExpectNoError(err)
e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout) err = e2epod.WaitForPodContainerStarted(f.ClientSet, f.Namespace.Name, p.Name, 0, framework.PodStartTimeout)
framework.ExpectNoError(err)
startedTime := time.Now()
// We assume the pod became ready when the container became ready. This
// is true for a single container pod.
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout)
framework.ExpectNoError(err)
readyTime := time.Now()
p, err = podClient.Get(context.TODO(), p.Name, metav1.GetOptions{}) p, err = podClient.Get(context.TODO(), p.Name, metav1.GetOptions{})
framework.ExpectNoError(err) framework.ExpectNoError(err)
@ -406,17 +407,14 @@ var _ = SIGDescribe("Probing container", func() {
framework.ExpectNoError(err) framework.ExpectNoError(err)
framework.ExpectEqual(isReady, true, "pod should be ready") framework.ExpectEqual(isReady, true, "pod should be ready")
// We assume the pod became ready when the container became ready. This readyIn := readyTime.Sub(startedTime)
// is true for a single container pod. framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn)
readyTime, err := GetTransitionTimeForReadyCondition(p) if readyIn < 0 {
framework.ExpectNoError(err)
startedTime, err := GetContainerStartedTime(p, "busybox")
framework.ExpectNoError(err)
framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime)
if readyTime.Sub(startedTime) < 40*time.Second {
framework.Failf("Pod became ready before startupProbe succeeded") framework.Failf("Pod became ready before startupProbe succeeded")
} }
if readyIn > 5*time.Second {
framework.Failf("Pod became ready in %v, more than 5s after startupProbe succeeded. It means that the delay readiness probes were not initiated immediately after startup finished.", readyIn)
}
}) })
}) })

View File

@ -321,6 +321,20 @@ func podContainerFailed(c clientset.Interface, namespace, podName string, contai
} }
} }
func podContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if containerIndex > len(pod.Status.ContainerStatuses)-1 {
return false, nil
}
containerStatus := pod.Status.ContainerStatuses[containerIndex]
return *containerStatus.Started, nil
}
}
// LogPodStates logs basic info of provided pods for debugging. // LogPodStates logs basic info of provided pods for debugging.
func LogPodStates(pods []v1.Pod) { func LogPodStates(pods []v1.Pod) {
// Find maximum widths for pod, node, and phase strings for column printing. // Find maximum widths for pod, node, and phase strings for column printing.

View File

@ -542,3 +542,8 @@ func WaitForNRestartablePods(ps *testutils.PodStore, expect int, timeout time.Du
func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error { func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason)) return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason))
} }
// WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe.
func WaitForPodContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podContainerStarted(c, namespace, podName, containerIndex))
}