Fix panic from multiple probe cleanup calls.

This commit is contained in:
Tim St. Clair 2016-02-05 13:36:08 -08:00
parent b32078d89b
commit da0d37f1e0
4 changed files with 45 additions and 7 deletions

View File

@ -171,7 +171,7 @@ func (m *manager) RemovePod(pod *api.Pod) {
for _, probeType := range [...]probeType{readiness, liveness} { for _, probeType := range [...]probeType{readiness, liveness} {
key.probeType = probeType key.probeType = probeType
if worker, ok := m.workers[key]; ok { if worker, ok := m.workers[key]; ok {
close(worker.stop) worker.stop()
} }
} }
} }
@ -188,7 +188,7 @@ func (m *manager) CleanupPods(activePods []*api.Pod) {
for key, worker := range m.workers { for key, worker := range m.workers {
if _, ok := desiredPods[key.podUID]; !ok { if _, ok := desiredPods[key.podUID]; !ok {
close(worker.stop) worker.stop()
} }
} }
} }

View File

@ -18,6 +18,7 @@ package prober
import ( import (
"fmt" "fmt"
"strconv"
"testing" "testing"
"time" "time"
@ -26,6 +27,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -173,6 +175,31 @@ func TestCleanupPods(t *testing.T) {
} }
} }
func TestCleanupRepeated(t *testing.T) {
m := newTestManager()
defer cleanup(t, m)
podTemplate := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "prober1",
ReadinessProbe: defaultProbe,
LivenessProbe: defaultProbe,
}},
},
}
const numTestPods = 100
for i := 0; i < numTestPods; i++ {
pod := podTemplate
pod.UID = types.UID(strconv.Itoa(i))
m.AddPod(&pod)
}
for i := 0; i < 10; i++ {
m.CleanupPods([]*api.Pod{})
}
}
func TestUpdatePodStatus(t *testing.T) { func TestUpdatePodStatus(t *testing.T) {
unprobed := api.ContainerStatus{ unprobed := api.ContainerStatus{
Name: "unprobed_container", Name: "unprobed_container",

View File

@ -32,8 +32,8 @@ import (
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
// container IDs. // container IDs.
type worker struct { type worker struct {
// Channel for stopping the probe, it should be closed to trigger a stop. // Channel for stopping the probe.
stop chan struct{} stopCh chan struct{}
// The pod containing this probe (read-only) // The pod containing this probe (read-only)
pod *api.Pod pod *api.Pod
@ -70,7 +70,7 @@ func newWorker(
container api.Container) *worker { container api.Container) *worker {
w := &worker{ w := &worker{
stop: make(chan struct{}), stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod, pod: pod,
container: container, container: container,
probeType: probeType, probeType: probeType,
@ -109,7 +109,7 @@ probeLoop:
for w.doProbe() { for w.doProbe() {
// Wait for next probe tick. // Wait for next probe tick.
select { select {
case <-w.stop: case <-w.stopCh:
break probeLoop break probeLoop
case <-probeTicker.C: case <-probeTicker.C:
// continue // continue
@ -117,6 +117,15 @@ probeLoop:
} }
} }
// stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
// It is safe to call stop multiple times.
func (w *worker) stop() {
select {
case w.stopCh <- struct{}{}:
default: // Non-blocking.
}
}
// doProbe probes the container once and records the result. // doProbe probes the container once and records the result.
// Returns whether the worker should continue. // Returns whether the worker should continue.
func (w *worker) doProbe() (keepGoing bool) { func (w *worker) doProbe() (keepGoing bool) {

View File

@ -236,7 +236,9 @@ func TestCleanUp(t *testing.T) {
} }
} }
close(w.stop) for i := 0; i < 10; i++ {
w.stop() // Stop should be callable multiple times without consequence.
}
if err := waitForWorkerExit(m, []probeKey{key}); err != nil { if err := waitForWorkerExit(m, []probeKey{key}); err != nil {
t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err)
} }