mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
use wait group
This commit is contained in:
parent
62055090b4
commit
fa1fbe88c1
@ -30,10 +30,7 @@ type EtcdResponseWithError struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FakeEtcdClient struct {
|
type FakeEtcdClient struct {
|
||||||
// chanLock guards watchChanReady.
|
nrUnreadyChannels sync.WaitGroup
|
||||||
chanLock sync.Mutex
|
|
||||||
// watchChanReady is readable when all public channels are ready
|
|
||||||
watchChanReady chan bool
|
|
||||||
|
|
||||||
Data map[string]EtcdResponseWithError
|
Data map[string]EtcdResponseWithError
|
||||||
DeletedKeys []string
|
DeletedKeys []string
|
||||||
@ -50,11 +47,17 @@ type FakeEtcdClient struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
|
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
|
||||||
return &FakeEtcdClient{
|
ret := &FakeEtcdClient{
|
||||||
t: t,
|
t: t,
|
||||||
Data: map[string]EtcdResponseWithError{},
|
Data: map[string]EtcdResponseWithError{},
|
||||||
watchChanReady: make(chan bool),
|
// watchChanReady: make(chan bool),
|
||||||
}
|
}
|
||||||
|
// There are 3 channels that are not ready:
|
||||||
|
// - WatchResponse chan *etcd.Response
|
||||||
|
// - WatchInjectError chan<- error
|
||||||
|
// - WatchStop chan<- bool
|
||||||
|
ret.nrUnreadyChannels.Add(3)
|
||||||
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
|
||||||
@ -105,36 +108,29 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) WaitToWatch() {
|
func (f *FakeEtcdClient) WaitToWatch() {
|
||||||
f.chanLock.Lock()
|
f.nrUnreadyChannels.Wait()
|
||||||
defer f.chanLock.Unlock()
|
|
||||||
<-f.watchChanReady
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
||||||
f.WatchResponse = receiver
|
f.WatchResponse = receiver
|
||||||
|
f.nrUnreadyChannels.Done()
|
||||||
f.WatchStop = stop
|
f.WatchStop = stop
|
||||||
|
f.nrUnreadyChannels.Done()
|
||||||
injectedError := make(chan error)
|
injectedError := make(chan error)
|
||||||
|
|
||||||
defer close(injectedError)
|
defer close(injectedError)
|
||||||
f.WatchInjectError = injectedError
|
f.WatchInjectError = injectedError
|
||||||
|
f.nrUnreadyChannels.Done()
|
||||||
// close the channel indicating all channels of the fake client are
|
|
||||||
// ready.
|
|
||||||
close(f.watchChanReady)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
f.chanLock.Lock()
|
// After calling this function, the WatchStop channel will not be ready
|
||||||
defer f.chanLock.Unlock()
|
f.nrUnreadyChannels.Add(3)
|
||||||
// A stop will make f.WatchStop close, which in turn makes it un-write-able.
|
|
||||||
// Need to have another call on Watch() to make it ready.
|
|
||||||
f.watchChanReady = make(chan bool)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
return nil, etcd.ErrWatchStoppedByUser
|
return nil, etcd.ErrWatchStoppedByUser
|
||||||
case err := <-injectedError:
|
case err := <-injectedError:
|
||||||
f.watchChanReady = make(chan bool)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Never get here.
|
// Never get here.
|
||||||
|
Loading…
Reference in New Issue
Block a user