Ensure proper serialization of updates and creates in federation test watcher

This commit is contained in:
Marcin Wielgus 2016-11-10 23:23:24 +01:00
parent abd653bd97
commit 3540846fbe

View File

@ -39,8 +39,10 @@ import (
// A structure that distributes eventes to multiple watchers. // A structure that distributes eventes to multiple watchers.
type WatcherDispatcher struct { type WatcherDispatcher struct {
sync.Mutex sync.Mutex
watchers []*watch.RaceFreeFakeWatcher watchers []*watch.RaceFreeFakeWatcher
eventsSoFar []*watch.Event eventsSoFar []*watch.Event
orderExecution chan func()
stopChan chan struct{}
} }
func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) { func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
@ -55,6 +57,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
func (wd *WatcherDispatcher) Stop() { func (wd *WatcherDispatcher) Stop() {
wd.Lock() wd.Lock()
defer wd.Unlock() defer wd.Unlock()
close(wd.stopChan)
for _, watcher := range wd.watchers { for _, watcher := range wd.watchers {
watcher.Stop() watcher.Stop()
} }
@ -136,9 +139,21 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object)
// All subsequent requests for a watch on the client will result in returning this fake watcher. // All subsequent requests for a watch on the client will result in returning this fake watcher.
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
dispatcher := &WatcherDispatcher{ dispatcher := &WatcherDispatcher{
watchers: make([]*watch.RaceFreeFakeWatcher, 0), watchers: make([]*watch.RaceFreeFakeWatcher, 0),
eventsSoFar: make([]*watch.Event, 0), eventsSoFar: make([]*watch.Event, 0),
orderExecution: make(chan func()),
stopChan: make(chan struct{}),
} }
go func() {
for {
select {
case fun := <-dispatcher.orderExecution:
fun()
case <-dispatcher.stopChan:
return
}
}
}()
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
watcher := watch.NewRaceFreeFake() watcher := watch.NewRaceFreeFake()
@ -166,11 +181,11 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
originalObj := createAction.GetObject() originalObj := createAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine. // Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj) obj := copy(originalObj)
go func() { watcher.orderExecution <- func() {
glog.V(4).Infof("Object created. Writing to channel: %v", obj) glog.V(4).Infof("Object created. Writing to channel: %v", obj)
watcher.Add(obj) watcher.Add(obj)
objChan <- obj objChan <- obj
}() }
return true, originalObj, nil return true, originalObj, nil
}) })
return objChan return objChan
@ -186,11 +201,11 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
originalObj := updateAction.GetObject() originalObj := updateAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine. // Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj) obj := copy(originalObj)
go func() { watcher.orderExecution <- func() {
glog.V(4).Infof("Object updated. Writing to channel: %v", obj) glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
watcher.Modify(obj) watcher.Modify(obj)
objChan <- obj objChan <- obj
}() }
return true, originalObj, nil return true, originalObj, nil
}) })
return objChan return objChan