Merge pull request #13363 from mesosphere/abort-kubelet-syncloop

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-09-04 17:45:32 -07:00
commit e6e69e31ec
2 changed files with 28 additions and 5 deletions

View File

@ -1896,17 +1896,19 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
} }
housekeepingTimestamp = time.Now() housekeepingTimestamp = time.Now()
} }
kl.syncLoopIteration(updates, handler) if !kl.syncLoopIteration(updates, handler) {
break
}
} }
} }
func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) { func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) bool {
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(time.Now())
select { select {
case u, ok := <-updates: case u, open := <-updates:
if !ok { if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.") glog.Errorf("Update channel is closed. Exiting the sync loop.")
return return false
} }
switch u.Op { switch u.Op {
case ADD: case ADD:
@ -1928,6 +1930,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
handler.HandlePodSyncs(kl.podManager.GetPods()) handler.HandlePodSyncs(kl.podManager.GetPods())
} }
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(time.Now())
return true
} }
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) { func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) {

View File

@ -329,6 +329,26 @@ func TestSyncLoopTimeUpdate(t *testing.T) {
} }
} }
func TestSyncLoopAbort(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
kubelet.lastTimestampRuntimeUp = time.Now()
kubelet.networkConfigured = true
ch := make(chan PodUpdate)
close(ch)
// sanity check (also prevent this test from hanging in the next step)
ok := kubelet.syncLoopIteration(ch, kubelet)
if ok {
t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed")
}
// this should terminate immediately; if it hangs then the syncLoopIteration isn't aborting properly
kubelet.syncLoop(ch, kubelet)
}
func TestSyncPodsStartPod(t *testing.T) { func TestSyncPodsStartPod(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)