Clean up housekeeping routine in kubelet

Now that kubelet checks sources seen correctly, there is no need to enforce the
initial order of pod updates and housekeeping. Use a ticker for housekeeping to
simplify the code.
This commit is contained in:
Yu-Ju Hong 2015-11-03 10:03:39 -08:00
parent f64780d6eb
commit b734155954
2 changed files with 29 additions and 41 deletions

View File

@ -106,9 +106,8 @@ const (
// system default DNS resolver configuration // system default DNS resolver configuration
ResolvConfDefault = "/etc/resolv.conf" ResolvConfDefault = "/etc/resolv.conf"
// Minimum period for performing global cleanup tasks, i.e., housekeeping // Period for performing global cleanup tasks.
// will not be performed more than once per housekeepingMinimumPeriod. housekeepingPeriod = time.Second * 2
housekeepingMinimumPeriod = time.Second * 2
etcHostsPath = "/etc/hosts" etcHostsPath = "/etc/hosts"
) )
@ -481,7 +480,6 @@ type Kubelet struct {
podWorkers PodWorkers podWorkers PodWorkers
resyncInterval time.Duration resyncInterval time.Duration
resyncTicker *time.Ticker
sourcesReady SourcesReadyFn sourcesReady SourcesReadyFn
// sourcesSeen records the sources seen by kubelet. This set is not thread // sourcesSeen records the sources seen by kubelet. This set is not thread
// safe and should only be access by the main kubelet syncloop goroutine. // safe and should only be access by the main kubelet syncloop goroutine.
@ -2086,8 +2084,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
// The resyncTicker wakes up kubelet to checks if there are any pod workers // The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the // that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s. // sync interval is defaulted to 10s.
kl.resyncTicker = time.NewTicker(time.Second) syncTicker := time.NewTicker(time.Second)
var housekeepingTimestamp time.Time housekeepingTicker := time.NewTicker(housekeepingPeriod)
for { for {
if !kl.containerRuntimeUp() { if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
@ -2099,35 +2097,14 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
glog.Infof("Skipping pod synchronization, network is not configured") glog.Infof("Skipping pod synchronization, network is not configured")
continue continue
} }
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) {
// Make sure we sync first to receive the pods from the sources before
// performing housekeeping.
if !kl.syncLoopIteration(updates, handler) {
break break
} }
// We don't want to perform housekeeping too often, so we set a minimum
// period for it. Housekeeping would be performed at least once every
// kl.resyncInterval, and *no* more than once every
// housekeepingMinimumPeriod.
// TODO (#13418): Investigate whether we can/should spawn a dedicated
// goroutine for housekeeping
if !kl.allSourcesReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
} else if housekeepingTimestamp.IsZero() {
housekeepingTimestamp = time.Now()
} else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
housekeepingTimestamp = time.Now()
}
} }
} }
func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler) bool { func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool {
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(time.Now())
select { select {
case u, open := <-updates: case u, open := <-updates:
@ -2150,7 +2127,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
// TODO: Do we want to support this? // TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update") glog.Errorf("Kubelet does not support snapshot update")
} }
case <-kl.resyncTicker.C: case <-syncCh:
podUIDs := kl.workQueue.GetWork() podUIDs := kl.workQueue.GetWork()
var podsToSync []*api.Pod var podsToSync []*api.Pod
for _, uid := range podUIDs { for _, uid := range podUIDs {
@ -2158,14 +2135,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
podsToSync = append(podsToSync, pod) podsToSync = append(podsToSync, pod)
} }
} }
glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync)) glog.V(2).Infof("SyncLoop (SYNC): %d pods", kubeletutil.FormatPodNames(podsToSync))
kl.HandlePodSyncs(podsToSync) kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates(): case update := <-kl.livenessManager.Updates():
// We only care about failures (signalling container death) here. // We only care about failures (signalling container death) here.
if update.Result == proberesults.Failure { if update.Result == proberesults.Failure {
glog.V(1).Infof("SyncLoop (container unhealthy).") glog.V(1).Infof("SyncLoop (container unhealthy): %q", kubeletutil.FormatPodName(update.Pod))
handler.HandlePodSyncs([]*api.Pod{update.Pod}) handler.HandlePodSyncs([]*api.Pod{update.Pod})
} }
case <-housekeepingCh:
if !kl.allSourcesReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
} }
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(time.Now())
return true return true

View File

@ -335,14 +335,17 @@ func TestSyncLoopTimeUpdate(t *testing.T) {
} }
// Start sync ticker. // Start sync ticker.
kubelet.resyncTicker = time.NewTicker(time.Millisecond) syncCh := make(chan time.Time, 1)
housekeepingCh := make(chan time.Time, 1)
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet) syncCh <- time.Now()
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
loopTime2 := kubelet.LatestLoopEntryTime() loopTime2 := kubelet.LatestLoopEntryTime()
if loopTime2.IsZero() { if loopTime2.IsZero() {
t.Errorf("Unexpected sync loop time: 0, expected non-zero value.") t.Errorf("Unexpected sync loop time: 0, expected non-zero value.")
} }
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet)
syncCh <- time.Now()
kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
loopTime3 := kubelet.LatestLoopEntryTime() loopTime3 := kubelet.LatestLoopEntryTime()
if !loopTime3.After(loopTime1) { if !loopTime3.After(loopTime1) {
t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp") t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp")
@ -355,15 +358,12 @@ func TestSyncLoopAbort(t *testing.T) {
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.lastTimestampRuntimeUp = time.Now() kubelet.lastTimestampRuntimeUp = time.Now()
kubelet.networkConfigured = true kubelet.networkConfigured = true
// The syncLoop waits on the resyncTicker, so we stop it immediately to avoid a race.
kubelet.resyncTicker = time.NewTicker(time.Second)
kubelet.resyncTicker.Stop()
ch := make(chan kubetypes.PodUpdate) ch := make(chan kubetypes.PodUpdate)
close(ch) close(ch)
// sanity check (also prevent this test from hanging in the next step) // sanity check (also prevent this test from hanging in the next step)
ok := kubelet.syncLoopIteration(ch, kubelet) ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time))
if ok { if ok {
t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed") t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed")
} }