diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index 1c3c2498916..969a42c443b 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -223,6 +223,13 @@ func (m *manager) removeWorker(podUID types.UID, containerName string, probeType delete(m.workers, probeKey{podUID, containerName, probeType}) } +// workerCount returns the total number of probe workers. For testing. +func (m *manager) workerCount() int { + m.workerLock.Lock() + defer m.workerLock.Unlock() + return len(m.workers) +} + func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index 06a78d3e176..b66a5891e96 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -78,6 +78,7 @@ func TestAddRemovePods(t *testing.T) { } m := newTestManager() + defer cleanup(t, m) if err := expectProbes(m, nil); err != nil { t.Error(err) } @@ -122,6 +123,7 @@ func TestAddRemovePods(t *testing.T) { func TestCleanupPods(t *testing.T) { m := newTestManager() + defer cleanup(t, m) podToCleanup := api.Pod{ ObjectMeta: api.ObjectMeta{ UID: "pod_cleanup", @@ -215,6 +217,8 @@ func TestUpdatePodStatus(t *testing.T) { } m := newTestManager() + // no cleanup: using fake workers. + // Setup probe "workers" and cached results. m.workers = map[probeKey]*worker{ probeKey{testPodUID, unprobed.Name, liveness}: {}, @@ -251,7 +255,21 @@ func TestUpdatePodStatus(t *testing.T) { func TestUpdateReadiness(t *testing.T) { testPod := getTestPod(readiness, api.Probe{}) m := newTestManager() - m.Start() + defer cleanup(t, m) + + // Start syncing readiness without leaking goroutine. + stopCh := make(chan struct{}) + go util.Until(m.updateReadiness, 0, stopCh) + defer func() { + close(stopCh) + // Send an update to exit updateReadiness() + m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &api.Pod{}) + }() + + exec := syncExecProber{} + exec.set(probe.Success, nil) + m.prober.exec = &exec + m.statusManager.SetPodStatus(&testPod, getTestRunningStatus()) m.AddPod(&testPod) @@ -266,7 +284,7 @@ func TestUpdateReadiness(t *testing.T) { } // Prober fails. - m.prober.exec = fakeExecProber{probe.Failure, nil} + exec.set(probe.Failure, nil) // Wait for failed status. if err := waitForReadyStatus(m, false); err != nil { @@ -300,7 +318,7 @@ outer: return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing) } -const interval = 100 * time.Millisecond +const interval = 1 * time.Second // Wait for the given workers to exit & clean up. func waitForWorkerExit(m *manager, workerPaths []probeKey) error { @@ -344,3 +362,22 @@ func waitForReadyStatus(m *manager, ready bool) error { return nil } + +// cleanup running probes to avoid leaking goroutines. +func cleanup(t *testing.T, m *manager) { + m.CleanupPods(nil) + + condition := func() (bool, error) { + workerCount := m.workerCount() + if workerCount > 0 { + glog.Infof("Waiting for %d workers to exit...", workerCount) + } + return workerCount == 0, nil + } + if exited, _ := condition(); exited { + return // Already exited, no need to poll. + } + if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { + t.Fatalf("Error during cleanup: %v", err) + } +} diff --git a/pkg/kubelet/prober/testing.go b/pkg/kubelet/prober/testing.go index fbdc77ffbda..9ea4541cbc3 100644 --- a/pkg/kubelet/prober/testing.go +++ b/pkg/kubelet/prober/testing.go @@ -18,6 +18,7 @@ package prober import ( "reflect" + "sync" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" @@ -120,3 +121,21 @@ type fakeExecProber struct { func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) { return p.result, "", p.err } + +type syncExecProber struct { + sync.RWMutex + fakeExecProber +} + +func (p *syncExecProber) set(result probe.Result, err error) { + p.Lock() + defer p.Unlock() + p.result = result + p.err = err +} + +func (p *syncExecProber) Probe(cmd exec.Cmd) (probe.Result, string, error) { + p.RLock() + defer p.RUnlock() + return p.fakeExecProber.Probe(cmd) +}