mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 21:53:52 +00:00
Merge pull request #19006 from wojtek-t/wait_on_stop_watch
Fix race in watch tests - attempt 3
This commit is contained in:
@@ -94,6 +94,8 @@ type etcdWatcher struct {
|
|||||||
userStop chan struct{}
|
userStop chan struct{}
|
||||||
stopped bool
|
stopped bool
|
||||||
stopLock sync.Mutex
|
stopLock sync.Mutex
|
||||||
|
// wg is used to avoid calls to etcd after Stop()
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// Injectable for testing. Send the event down the outgoing channel.
|
// Injectable for testing. Send the event down the outgoing channel.
|
||||||
emit func(watch.Event)
|
emit func(watch.Event)
|
||||||
@@ -129,6 +131,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
|
|||||||
outgoing: make(chan watch.Event),
|
outgoing: make(chan watch.Event),
|
||||||
userStop: make(chan struct{}),
|
userStop: make(chan struct{}),
|
||||||
stopped: false,
|
stopped: false,
|
||||||
|
wg: sync.WaitGroup{},
|
||||||
cache: cache,
|
cache: cache,
|
||||||
ctx: nil,
|
ctx: nil,
|
||||||
cancel: nil,
|
cancel: nil,
|
||||||
@@ -145,6 +148,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
|
|||||||
defer close(w.etcdError)
|
defer close(w.etcdError)
|
||||||
defer close(w.etcdIncoming)
|
defer close(w.etcdIncoming)
|
||||||
|
|
||||||
|
// All calls to etcd are coming from this function - once it is finished
|
||||||
|
// no other call to etcd should be generated by this watcher.
|
||||||
|
w.wg.Add(1)
|
||||||
|
defer w.wg.Done()
|
||||||
|
|
||||||
// We need to be prepared, that Stop() can be called at any time.
|
// We need to be prepared, that Stop() can be called at any time.
|
||||||
// It can potentially also be called, even before this function is called.
|
// It can potentially also be called, even before this function is called.
|
||||||
// If that is the case, we simply skip all the code here.
|
// If that is the case, we simply skip all the code here.
|
||||||
@@ -456,4 +464,7 @@ func (w *etcdWatcher) Stop() {
|
|||||||
w.stopped = true
|
w.stopped = true
|
||||||
close(w.userStop)
|
close(w.userStop)
|
||||||
}
|
}
|
||||||
|
// Wait until all calls to etcd are finished and no other
|
||||||
|
// will be issued.
|
||||||
|
w.wg.Wait()
|
||||||
}
|
}
|
||||||
|
@@ -246,7 +246,7 @@ func TestWatch(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
defer watching.Stop()
|
// watching is explicitly closed below.
|
||||||
|
|
||||||
// Test normal case
|
// Test normal case
|
||||||
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
@@ -327,8 +327,6 @@ func TestWatchEtcdState(t *testing.T) {
|
|||||||
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
||||||
t.Errorf("%s: expected %v, got %v", e, a)
|
t.Errorf("%s: expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
watching.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchFromZeroIndex(t *testing.T) {
|
func TestWatchFromZeroIndex(t *testing.T) {
|
||||||
@@ -379,8 +377,6 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
|||||||
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
||||||
t.Errorf("%s: expected %v, got %v", e, a)
|
t.Errorf("%s: expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
watching.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchListFromZeroIndex(t *testing.T) {
|
func TestWatchListFromZeroIndex(t *testing.T) {
|
||||||
@@ -411,8 +407,6 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
|||||||
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
||||||
t.Errorf("%s: expected %v, got %v", e, a)
|
t.Errorf("%s: expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
watching.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchListIgnoresRootKey(t *testing.T) {
|
func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||||
@@ -444,8 +438,6 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
|||||||
default:
|
default:
|
||||||
// fall through, expected behavior
|
// fall through, expected behavior
|
||||||
}
|
}
|
||||||
|
|
||||||
watching.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||||
|
Reference in New Issue
Block a user