Move startupManager updates handling to kubelet

This commit is contained in:
Matthias Bertschy 2021-01-16 13:34:38 +01:00
parent 4870e64ac1
commit eed218a3a2
2 changed files with 19 additions and 22 deletions

View File

@ -1902,8 +1902,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * housekeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
// * liveness/startup manager: sync pods that have failed or in which one or more
// containers have failed liveness/startup checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
@ -1979,18 +1979,12 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
syncPod(kl, update, handler)
}
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
syncPod(kl, update, handler)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
@ -2006,6 +2000,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
return true
}
func syncPod(kl *Kubelet, update proberesults.Update, handler SyncHandler) {
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop: ignore irrelevant update: %#v", update)
return
}
klog.V(1).Infof("SyncLoop: %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {

View File

@ -125,8 +125,6 @@ func NewManager(
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
// Key uniquely identifying container probes
@ -309,10 +307,3 @@ func (m *manager) updateReadiness() {
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
func (m *manager) updateStartup() {
update := <-m.startupManager.Updates()
started := update.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}