Merge pull request #18729 from timstclair/worker-race

Concurrency fixes for prober tests
This commit is contained in:
Wojciech Tyczynski 2015-12-18 16:15:13 +01:00
commit bf73d1741e
3 changed files with 66 additions and 3 deletions

View File

@ -223,6 +223,13 @@ func (m *manager) removeWorker(podUID types.UID, containerName string, probeType
delete(m.workers, probeKey{podUID, containerName, probeType})
}
// workerCount returns the total number of probe workers. For testing.
func (m *manager) workerCount() int {
m.workerLock.Lock()
defer m.workerLock.Unlock()
return len(m.workers)
}
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()

View File

@ -78,6 +78,7 @@ func TestAddRemovePods(t *testing.T) {
}
m := newTestManager()
defer cleanup(t, m)
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
@ -122,6 +123,7 @@ func TestAddRemovePods(t *testing.T) {
func TestCleanupPods(t *testing.T) {
m := newTestManager()
defer cleanup(t, m)
podToCleanup := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "pod_cleanup",
@ -215,6 +217,8 @@ func TestUpdatePodStatus(t *testing.T) {
}
m := newTestManager()
// no cleanup: using fake workers.
// Setup probe "workers" and cached results.
m.workers = map[probeKey]*worker{
probeKey{testPodUID, unprobed.Name, liveness}: {},
@ -251,7 +255,21 @@ func TestUpdatePodStatus(t *testing.T) {
func TestUpdateReadiness(t *testing.T) {
testPod := getTestPod(readiness, api.Probe{})
m := newTestManager()
m.Start()
defer cleanup(t, m)
// Start syncing readiness without leaking goroutine.
stopCh := make(chan struct{})
go util.Until(m.updateReadiness, 0, stopCh)
defer func() {
close(stopCh)
// Send an update to exit updateReadiness()
m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &api.Pod{})
}()
exec := syncExecProber{}
exec.set(probe.Success, nil)
m.prober.exec = &exec
m.statusManager.SetPodStatus(&testPod, getTestRunningStatus())
m.AddPod(&testPod)
@ -266,7 +284,7 @@ func TestUpdateReadiness(t *testing.T) {
}
// Prober fails.
m.prober.exec = fakeExecProber{probe.Failure, nil}
exec.set(probe.Failure, nil)
// Wait for failed status.
if err := waitForReadyStatus(m, false); err != nil {
@ -300,7 +318,7 @@ outer:
return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
}
const interval = 100 * time.Millisecond
const interval = 1 * time.Second
// Wait for the given workers to exit & clean up.
func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
@ -344,3 +362,22 @@ func waitForReadyStatus(m *manager, ready bool) error {
return nil
}
// cleanup running probes to avoid leaking goroutines.
func cleanup(t *testing.T, m *manager) {
m.CleanupPods(nil)
condition := func() (bool, error) {
workerCount := m.workerCount()
if workerCount > 0 {
glog.Infof("Waiting for %d workers to exit...", workerCount)
}
return workerCount == 0, nil
}
if exited, _ := condition(); exited {
return // Already exited, no need to poll.
}
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
t.Fatalf("Error during cleanup: %v", err)
}
}

View File

@ -18,6 +18,7 @@ package prober
import (
"reflect"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -120,3 +121,21 @@ type fakeExecProber struct {
func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
return p.result, "", p.err
}
type syncExecProber struct {
sync.RWMutex
fakeExecProber
}
func (p *syncExecProber) set(result probe.Result, err error) {
p.Lock()
defer p.Unlock()
p.result = result
p.err = err
}
func (p *syncExecProber) Probe(cmd exec.Cmd) (probe.Result, string, error) {
p.RLock()
defer p.RUnlock()
return p.fakeExecProber.Probe(cmd)
}