mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
fix data races in controller
This commit is contained in:
parent
bf44347340
commit
e13e31866d
@ -18,7 +18,6 @@ package tools
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
@ -30,8 +29,7 @@ type EtcdResponseWithError struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FakeEtcdClient struct {
|
type FakeEtcdClient struct {
|
||||||
condWatchCompleted *sync.Cond
|
watchCompletedChan chan bool
|
||||||
condLock sync.Mutex
|
|
||||||
|
|
||||||
Data map[string]EtcdResponseWithError
|
Data map[string]EtcdResponseWithError
|
||||||
DeletedKeys []string
|
DeletedKeys []string
|
||||||
@ -59,12 +57,11 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
|
|||||||
// They are only available when Watch() is called. If users of
|
// They are only available when Watch() is called. If users of
|
||||||
// FakeEtcdClient want to use any of these channels, they have to call
|
// FakeEtcdClient want to use any of these channels, they have to call
|
||||||
// WaitForWatchCompletion before any operation on these channels.
|
// WaitForWatchCompletion before any operation on these channels.
|
||||||
// Internally, FakeEtcdClient use condWatchCompleted to indicate if the
|
// Internally, FakeEtcdClient use watchCompletedChan to indicate if the
|
||||||
// Watch() method has been called. WaitForWatchCompletion() will wait
|
// Watch() method has been called. WaitForWatchCompletion() will wait
|
||||||
// on condWatchCompleted. By the end of the Watch() method, it will
|
// on this channel. WaitForWatchCompletion() will return only when
|
||||||
// call Broadcast() on condWatchCompleted, which will awake any
|
// WatchResponse, WatchInjectError and WatchStop are ready to read/write.
|
||||||
// goroutine waiting on this condition.
|
ret.watchCompletedChan = make(chan bool)
|
||||||
ret.condWatchCompleted = sync.NewCond(&ret.condLock)
|
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,9 +113,7 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) WaitForWatchCompletion() {
|
func (f *FakeEtcdClient) WaitForWatchCompletion() {
|
||||||
f.condLock.Lock()
|
<-f.watchCompletedChan
|
||||||
defer f.condLock.Unlock()
|
|
||||||
f.condWatchCompleted.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
||||||
@ -129,8 +124,7 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
|
|||||||
defer close(injectedError)
|
defer close(injectedError)
|
||||||
f.WatchInjectError = injectedError
|
f.WatchInjectError = injectedError
|
||||||
|
|
||||||
f.condWatchCompleted.Broadcast()
|
f.watchCompletedChan <- true
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
return nil, etcd.ErrWatchStoppedByUser
|
return nil, etcd.ErrWatchStoppedByUser
|
||||||
|
Loading…
Reference in New Issue
Block a user