From b734155954254d47a9d039daeb4d73f82dd72499 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 3 Nov 2015 10:03:39 -0800 Subject: [PATCH] Clean up housekeeping routine in kubelet Now that kubelet checks sources seen correctly, there is no need to enforce the initial order of pod updates and housekeeping. Use a ticker for housekeeping to simplify the code. --- pkg/kubelet/kubelet.go | 54 +++++++++++++++---------------------- pkg/kubelet/kubelet_test.go | 16 +++++------ 2 files changed, 29 insertions(+), 41 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cf156a56da8..05da52a075d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -106,9 +106,8 @@ const ( // system default DNS resolver configuration ResolvConfDefault = "/etc/resolv.conf" - // Minimum period for performing global cleanup tasks, i.e., housekeeping - // will not be performed more than once per housekeepingMinimumPeriod. - housekeepingMinimumPeriod = time.Second * 2 + // Period for performing global cleanup tasks. + housekeepingPeriod = time.Second * 2 etcHostsPath = "/etc/hosts" ) @@ -481,7 +480,6 @@ type Kubelet struct { podWorkers PodWorkers resyncInterval time.Duration - resyncTicker *time.Ticker sourcesReady SourcesReadyFn // sourcesSeen records the sources seen by kubelet. This set is not thread // safe and should only be access by the main kubelet syncloop goroutine. @@ -2086,8 +2084,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // The resyncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. - kl.resyncTicker = time.NewTicker(time.Second) - var housekeepingTimestamp time.Time + syncTicker := time.NewTicker(time.Second) + housekeepingTicker := time.NewTicker(housekeepingPeriod) for { if !kl.containerRuntimeUp() { time.Sleep(5 * time.Second) @@ -2099,35 +2097,14 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand glog.Infof("Skipping pod synchronization, network is not configured") continue } - - // Make sure we sync first to receive the pods from the sources before - // performing housekeeping. - if !kl.syncLoopIteration(updates, handler) { + if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) { break } - // We don't want to perform housekeeping too often, so we set a minimum - // period for it. Housekeeping would be performed at least once every - // kl.resyncInterval, and *no* more than once every - // housekeepingMinimumPeriod. - // TODO (#13418): Investigate whether we can/should spawn a dedicated - // goroutine for housekeeping - if !kl.allSourcesReady() { - // If the sources aren't ready, skip housekeeping, as we may - // accidentally delete pods from unready sources. - glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.") - } else if housekeepingTimestamp.IsZero() { - housekeepingTimestamp = time.Now() - } else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod { - glog.V(4).Infof("SyncLoop (housekeeping)") - if err := handler.HandlePodCleanups(); err != nil { - glog.Errorf("Failed cleaning pods: %v", err) - } - housekeepingTimestamp = time.Now() - } } } -func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler) bool { +func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, + syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool { kl.syncLoopMonitor.Store(time.Now()) select { case u, open := <-updates: @@ -2150,7 +2127,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") } - case <-kl.resyncTicker.C: + case <-syncCh: podUIDs := kl.workQueue.GetWork() var podsToSync []*api.Pod for _, uid := range podUIDs { @@ -2158,14 +2135,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler podsToSync = append(podsToSync, pod) } } - glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync)) + glog.V(2).Infof("SyncLoop (SYNC): %d pods", kubeletutil.FormatPodNames(podsToSync)) kl.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): // We only care about failures (signalling container death) here. if update.Result == proberesults.Failure { - glog.V(1).Infof("SyncLoop (container unhealthy).") + glog.V(1).Infof("SyncLoop (container unhealthy): %q", kubeletutil.FormatPodName(update.Pod)) handler.HandlePodSyncs([]*api.Pod{update.Pod}) } + case <-housekeepingCh: + if !kl.allSourcesReady() { + // If the sources aren't ready, skip housekeeping, as we may + // accidentally delete pods from unready sources. + glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") + } else { + glog.V(4).Infof("SyncLoop (housekeeping)") + if err := handler.HandlePodCleanups(); err != nil { + glog.Errorf("Failed cleaning pods: %v", err) + } + } } kl.syncLoopMonitor.Store(time.Now()) return true diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index d4929756a1f..e05b4e53b92 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -335,14 +335,17 @@ func TestSyncLoopTimeUpdate(t *testing.T) { } // Start sync ticker. - kubelet.resyncTicker = time.NewTicker(time.Millisecond) - - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet) + syncCh := make(chan time.Time, 1) + housekeepingCh := make(chan time.Time, 1) + syncCh <- time.Now() + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) loopTime2 := kubelet.LatestLoopEntryTime() if loopTime2.IsZero() { t.Errorf("Unexpected sync loop time: 0, expected non-zero value.") } - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet) + + syncCh <- time.Now() + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) loopTime3 := kubelet.LatestLoopEntryTime() if !loopTime3.After(loopTime1) { t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp") @@ -355,15 +358,12 @@ func TestSyncLoopAbort(t *testing.T) { kubelet := testKubelet.kubelet kubelet.lastTimestampRuntimeUp = time.Now() kubelet.networkConfigured = true - // The syncLoop waits on the resyncTicker, so we stop it immediately to avoid a race. - kubelet.resyncTicker = time.NewTicker(time.Second) - kubelet.resyncTicker.Stop() ch := make(chan kubetypes.PodUpdate) close(ch) // sanity check (also prevent this test from hanging in the next step) - ok := kubelet.syncLoopIteration(ch, kubelet) + ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time)) if ok { t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed") }