mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Move readinessManager updates handling to kubelet
This commit is contained in:
parent
eed218a3a2
commit
431e6a7044
@ -581,6 +581,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
|
||||
|
||||
klet.livenessManager = proberesults.NewManager()
|
||||
klet.readinessManager = proberesults.NewManager()
|
||||
klet.startupManager = proberesults.NewManager()
|
||||
klet.podCache = kubecontainer.NewCache()
|
||||
|
||||
@ -618,6 +619,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
klet.readinessManager,
|
||||
klet.startupManager,
|
||||
seccompProfileRoot,
|
||||
machineInfo,
|
||||
@ -716,6 +718,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.probeManager = prober.NewManager(
|
||||
klet.statusManager,
|
||||
klet.livenessManager,
|
||||
klet.readinessManager,
|
||||
klet.startupManager,
|
||||
klet.runner,
|
||||
kubeDeps.Recorder)
|
||||
@ -924,8 +927,9 @@ type Kubelet struct {
|
||||
// Handles container probing.
|
||||
probeManager prober.Manager
|
||||
// Manages container health check results.
|
||||
livenessManager proberesults.Manager
|
||||
startupManager proberesults.Manager
|
||||
livenessManager proberesults.Manager
|
||||
readinessManager proberesults.Manager
|
||||
startupManager proberesults.Manager
|
||||
|
||||
// How long to keep idle streaming command execution/port forwarding
|
||||
// connections open before terminating them
|
||||
@ -1438,7 +1442,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
|
||||
// Start component sync loops.
|
||||
kl.statusManager.Start()
|
||||
kl.probeManager.Start()
|
||||
|
||||
// Start syncing RuntimeClasses if enabled.
|
||||
if kl.runtimeClassManager != nil {
|
||||
@ -1902,8 +1905,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
|
||||
// * plegCh: update the runtime cache; sync pod
|
||||
// * syncCh: sync all pods waiting for sync
|
||||
// * housekeepingCh: trigger cleanup of pods
|
||||
// * liveness/startup manager: sync pods that have failed or in which one or more
|
||||
// containers have failed liveness/startup checks
|
||||
// * health manager: sync pods that have failed or in which one or more
|
||||
// containers have failed health checks
|
||||
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
|
||||
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
|
||||
select {
|
||||
@ -1978,13 +1981,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
||||
handler.HandlePodSyncs(podsToSync)
|
||||
case update := <-kl.livenessManager.Updates():
|
||||
if update.Result == proberesults.Failure {
|
||||
// The liveness manager detected a failure; sync the pod.
|
||||
syncPod(kl, update, handler)
|
||||
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
|
||||
}
|
||||
case update := <-kl.readinessManager.Updates():
|
||||
ready := update.Result == proberesults.Success
|
||||
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
|
||||
handleProbeSync(kl, update, handler, "readiness", map[bool]string{true: "ready", false: ""}[ready])
|
||||
case update := <-kl.startupManager.Updates():
|
||||
started := update.Result == proberesults.Success
|
||||
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
|
||||
syncPod(kl, update, handler)
|
||||
handleProbeSync(kl, update, handler, "startup", map[bool]string{true: "started", false: "unhealthy"}[started])
|
||||
case <-housekeepingCh:
|
||||
if !kl.sourcesReady.AllReady() {
|
||||
// If the sources aren't ready or volume manager has not yet synced the states,
|
||||
@ -2000,15 +2006,19 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
||||
return true
|
||||
}
|
||||
|
||||
func syncPod(kl *Kubelet, update proberesults.Update, handler SyncHandler) {
|
||||
func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
|
||||
probeAndStatus := probe
|
||||
if len(status) > 0 {
|
||||
probeAndStatus = fmt.Sprintf("%s (container %s)", probe, status)
|
||||
}
|
||||
// We should not use the pod from manager, because it is never updated after initialization.
|
||||
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
|
||||
if !ok {
|
||||
// If the pod no longer exists, ignore the update.
|
||||
klog.V(4).Infof("SyncLoop: ignore irrelevant update: %#v", update)
|
||||
klog.V(4).Infof("SyncLoop %s: ignore irrelevant update: %#v", probeAndStatus, update)
|
||||
return
|
||||
}
|
||||
klog.V(1).Infof("SyncLoop: %q", format.Pod(pod))
|
||||
klog.V(1).Infof("SyncLoop %s: %q", probeAndStatus, format.Pod(pod))
|
||||
handler.HandlePodSyncs([]*v1.Pod{pod})
|
||||
}
|
||||
|
||||
|
@ -241,6 +241,7 @@ func newTestKubeletWithImageList(
|
||||
|
||||
kubelet.probeManager = probetest.FakeManager{}
|
||||
kubelet.livenessManager = proberesults.NewManager()
|
||||
kubelet.readinessManager = proberesults.NewManager()
|
||||
kubelet.startupManager = proberesults.NewManager()
|
||||
|
||||
fakeContainerManager := cm.NewFakeContainerManager()
|
||||
|
@ -101,8 +101,9 @@ type kubeGenericRuntimeManager struct {
|
||||
runtimeHelper kubecontainer.RuntimeHelper
|
||||
|
||||
// Health check results.
|
||||
livenessManager proberesults.Manager
|
||||
startupManager proberesults.Manager
|
||||
livenessManager proberesults.Manager
|
||||
readinessManager proberesults.Manager
|
||||
startupManager proberesults.Manager
|
||||
|
||||
// If true, enforce container cpu limits with CFS quota support
|
||||
cpuCFSQuota bool
|
||||
@ -159,6 +160,7 @@ type LegacyLogProvider interface {
|
||||
func NewKubeGenericRuntimeManager(
|
||||
recorder record.EventRecorder,
|
||||
livenessManager proberesults.Manager,
|
||||
readinessManager proberesults.Manager,
|
||||
startupManager proberesults.Manager,
|
||||
seccompProfileRoot string,
|
||||
machineInfo *cadvisorapi.MachineInfo,
|
||||
@ -187,6 +189,7 @@ func NewKubeGenericRuntimeManager(
|
||||
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
|
||||
seccompProfileRoot: seccompProfileRoot,
|
||||
livenessManager: livenessManager,
|
||||
readinessManager: readinessManager,
|
||||
startupManager: startupManager,
|
||||
machineInfo: machineInfo,
|
||||
osInterface: osInterface,
|
||||
|
@ -111,6 +111,7 @@ func newTestManager() *manager {
|
||||
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
|
||||
results.NewManager(),
|
||||
results.NewManager(),
|
||||
results.NewManager(),
|
||||
nil, // runner
|
||||
&record.FakeRecorder{},
|
||||
).(*manager)
|
||||
|
@ -17,14 +17,13 @@ limitations under the License.
|
||||
package prober
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
@ -71,9 +70,6 @@ type Manager interface {
|
||||
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
|
||||
// container based on container running status, cached probe results and worker states.
|
||||
UpdatePodStatus(types.UID, *v1.PodStatus)
|
||||
|
||||
// Start starts the Manager sync loops.
|
||||
Start()
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
@ -104,12 +100,12 @@ type manager struct {
|
||||
func NewManager(
|
||||
statusManager status.Manager,
|
||||
livenessManager results.Manager,
|
||||
readinessManager results.Manager,
|
||||
startupManager results.Manager,
|
||||
runner kubecontainer.CommandRunner,
|
||||
recorder record.EventRecorder) Manager {
|
||||
|
||||
prober := newProber(runner, recorder)
|
||||
readinessManager := results.NewManager()
|
||||
return &manager{
|
||||
statusManager: statusManager,
|
||||
prober: prober,
|
||||
@ -121,12 +117,6 @@ func NewManager(
|
||||
}
|
||||
}
|
||||
|
||||
// Start syncing probe status. This should only be called once.
|
||||
func (m *manager) Start() {
|
||||
// Start syncing readiness.
|
||||
go wait.Forever(m.updateReadiness, 0)
|
||||
}
|
||||
|
||||
// Key uniquely identifying container probes
|
||||
type probeKey struct {
|
||||
podUID types.UID
|
||||
@ -263,6 +253,7 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
|
||||
select {
|
||||
case w.manualTriggerCh <- struct{}{}:
|
||||
default: // Non-blocking.
|
||||
klog.Warningf("Failed to trigger a manual run of %s probe", w.probeType.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -300,10 +291,3 @@ func (m *manager) workerCount() int {
|
||||
defer m.workerLock.RUnlock()
|
||||
return len(m.workers)
|
||||
}
|
||||
|
||||
func (m *manager) updateReadiness() {
|
||||
update := <-m.readinessManager.Updates()
|
||||
|
||||
ready := update.Result == results.Success
|
||||
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
|
||||
}
|
||||
|
@ -319,6 +319,13 @@ func TestUpdatePodStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) extractedReadinessHandling() {
|
||||
update := <-m.readinessManager.Updates()
|
||||
// This code corresponds to an extract from kubelet.syncLoopIteration()
|
||||
ready := update.Result == results.Success
|
||||
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
|
||||
}
|
||||
|
||||
func TestUpdateReadiness(t *testing.T) {
|
||||
testPod := getTestPod()
|
||||
setTestProbe(testPod, readiness, v1.Probe{})
|
||||
@ -327,10 +334,10 @@ func TestUpdateReadiness(t *testing.T) {
|
||||
|
||||
// Start syncing readiness without leaking goroutine.
|
||||
stopCh := make(chan struct{})
|
||||
go wait.Until(m.updateReadiness, 0, stopCh)
|
||||
go wait.Until(m.extractedReadinessHandling, 0, stopCh)
|
||||
defer func() {
|
||||
close(stopCh)
|
||||
// Send an update to exit updateReadiness()
|
||||
// Send an update to exit extractedReadinessHandling()
|
||||
m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
|
||||
}()
|
||||
|
||||
|
@ -373,27 +373,18 @@ var _ = SIGDescribe("Probing container", func() {
|
||||
Testname: Pod readiness probe, delayed by startup probe
|
||||
Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test.
|
||||
*/
|
||||
ginkgo.It("should not be ready until startupProbe succeeds", func() {
|
||||
ginkgo.It("should be ready immediately after startupProbe succeeds", func() {
|
||||
sleepBeforeStarted := time.Duration(45)
|
||||
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("echo ok >/tmp/health; sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)}
|
||||
cmd := []string{"/bin/sh", "-c", fmt.Sprintf("sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)}
|
||||
readinessProbe := &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
Exec: &v1.ExecAction{
|
||||
Command: []string{"cat", "/tmp/health"},
|
||||
},
|
||||
},
|
||||
Handler: execHandler([]string{"/bin/true"}),
|
||||
InitialDelaySeconds: 0,
|
||||
PeriodSeconds: 60,
|
||||
}
|
||||
startupProbe := &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
Exec: &v1.ExecAction{
|
||||
Command: []string{"cat", "/tmp/startup"},
|
||||
},
|
||||
},
|
||||
Handler: execHandler([]string{"/bin/cat", "/tmp/startup"}),
|
||||
InitialDelaySeconds: 0,
|
||||
PeriodSeconds: 1,
|
||||
FailureThreshold: 600,
|
||||
FailureThreshold: 60,
|
||||
}
|
||||
p := podClient.Create(startupPodSpec(startupProbe, readinessProbe, nil, cmd))
|
||||
|
||||
@ -416,12 +407,13 @@ var _ = SIGDescribe("Probing container", func() {
|
||||
startedTime, err := GetContainerStartedTime(p, "busybox")
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime)
|
||||
if readyTime.Sub(startedTime) < sleepBeforeStarted*time.Second {
|
||||
readyIn := readyTime.Sub(startedTime) - sleepBeforeStarted*time.Second
|
||||
framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn)
|
||||
if readyIn < 0 {
|
||||
framework.Failf("Pod became ready before startupProbe succeeded")
|
||||
}
|
||||
if readyTime.Sub(startedTime) > (sleepBeforeStarted+20)*time.Second {
|
||||
framework.Failf("Pod became ready more than 20s after startupProbe succeeded")
|
||||
if readyIn > 5*time.Second {
|
||||
framework.Failf("Pod became ready in %v, more than 5s after startupProbe succeeded. It means that the delay readiness probes were not initiated immediately after startup finished.", readyIn)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user