mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 08:40:42 +00:00
Fix race in EtcdWatcher
This commit is contained in:
parent
837a070d2f
commit
6297232112
@ -144,23 +144,42 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
|
||||
defer util.HandleCrash()
|
||||
defer close(w.etcdError)
|
||||
defer close(w.etcdIncoming)
|
||||
if resourceVersion == 0 {
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return
|
||||
}
|
||||
resourceVersion = latest
|
||||
}
|
||||
|
||||
opts := etcd.WatcherOptions{
|
||||
Recursive: w.list,
|
||||
AfterIndex: resourceVersion,
|
||||
// We need to be prepared, that Stop() can be called at any time.
|
||||
// It can potentially also be called, even before this function is called.
|
||||
// If that is the case, we simply skip all the code here.
|
||||
// See #18928 for more details.
|
||||
var watcher etcd.Watcher
|
||||
returned := func() bool {
|
||||
w.stopLock.Lock()
|
||||
defer w.stopLock.Unlock()
|
||||
if w.stopped {
|
||||
// Watcher has already been stopped - don't event initiate it here.
|
||||
return true
|
||||
}
|
||||
// Perform initialization of watcher under lock - we want to avoid situation when
|
||||
// Stop() is called in the meantime (which in tests can cause etcd termination and
|
||||
// strange behavior here).
|
||||
if resourceVersion == 0 {
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return true
|
||||
}
|
||||
resourceVersion = latest
|
||||
}
|
||||
|
||||
opts := etcd.WatcherOptions{
|
||||
Recursive: w.list,
|
||||
AfterIndex: resourceVersion,
|
||||
}
|
||||
watcher = client.Watcher(key, &opts)
|
||||
w.ctx, w.cancel = context.WithCancel(ctx)
|
||||
return false
|
||||
}()
|
||||
if returned {
|
||||
return
|
||||
}
|
||||
watcher := client.Watcher(key, &opts)
|
||||
w.stopLock.Lock()
|
||||
w.ctx, w.cancel = context.WithCancel(ctx)
|
||||
w.stopLock.Unlock()
|
||||
|
||||
for {
|
||||
resp, err := watcher.Next(w.ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user