diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index 4541c21d5c8..2282d8e6285 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -171,7 +171,7 @@ func (m *manager) RemovePod(pod *api.Pod) { for _, probeType := range [...]probeType{readiness, liveness} { key.probeType = probeType if worker, ok := m.workers[key]; ok { - close(worker.stop) + worker.stop() } } } @@ -188,7 +188,7 @@ func (m *manager) CleanupPods(activePods []*api.Pod) { for key, worker := range m.workers { if _, ok := desiredPods[key.podUID]; !ok { - close(worker.stop) + worker.stop() } } } diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index f01ba50cf04..ac64843f0ff 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -18,6 +18,7 @@ package prober import ( "fmt" + "strconv" "testing" "time" @@ -26,6 +27,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -173,6 +175,31 @@ func TestCleanupPods(t *testing.T) { } } +func TestCleanupRepeated(t *testing.T) { + m := newTestManager() + defer cleanup(t, m) + podTemplate := api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "prober1", + ReadinessProbe: defaultProbe, + LivenessProbe: defaultProbe, + }}, + }, + } + + const numTestPods = 100 + for i := 0; i < numTestPods; i++ { + pod := podTemplate + pod.UID = types.UID(strconv.Itoa(i)) + m.AddPod(&pod) + } + + for i := 0; i < 10; i++ { + m.CleanupPods([]*api.Pod{}) + } +} + func TestUpdatePodStatus(t *testing.T) { unprobed := api.ContainerStatus{ Name: "unprobed_container", diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index 60f4c6d6747..c3d0f2241c4 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -32,8 +32,8 @@ import ( // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date // container IDs. type worker struct { - // Channel for stopping the probe, it should be closed to trigger a stop. - stop chan struct{} + // Channel for stopping the probe. + stopCh chan struct{} // The pod containing this probe (read-only) pod *api.Pod @@ -70,7 +70,7 @@ func newWorker( container api.Container) *worker { w := &worker{ - stop: make(chan struct{}), + stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. pod: pod, container: container, probeType: probeType, @@ -109,7 +109,7 @@ probeLoop: for w.doProbe() { // Wait for next probe tick. select { - case <-w.stop: + case <-w.stopCh: break probeLoop case <-probeTicker.C: // continue @@ -117,6 +117,15 @@ probeLoop: } } +// stop stops the probe worker. The worker handles cleanup and removes itself from its manager. +// It is safe to call stop multiple times. +func (w *worker) stop() { + select { + case w.stopCh <- struct{}{}: + default: // Non-blocking. + } +} + // doProbe probes the container once and records the result. // Returns whether the worker should continue. func (w *worker) doProbe() (keepGoing bool) { diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 192bc30c0c5..26a87dd2dcf 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -236,7 +236,9 @@ func TestCleanUp(t *testing.T) { } } - close(w.stop) + for i := 0; i < 10; i++ { + w.stop() // Stop should be callable multiple times without consequence. + } if err := waitForWorkerExit(m, []probeKey{key}); err != nil { t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) }