mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Merge pull request #36613 from mwielgus/watcher-fix-2
Automatic merge from submit-queue Ensure proper serialization of updates and creates in federation test watcher Fix for finalizer test problems. The issue there was that the updates were coming out of order. It was caused by firing updates in new goroutines in test watcher. The proper solution is to order them in a queue and fire in order on a single goroutine. Ref: https://github.com/kubernetes/kubernetes/issues/36473#issuecomment-259605870 cc: @nikhiljindal @madhusudancs
This commit is contained in:
commit
e4e3e41522
@ -39,8 +39,10 @@ import (
|
|||||||
// A structure that distributes eventes to multiple watchers.
|
// A structure that distributes eventes to multiple watchers.
|
||||||
type WatcherDispatcher struct {
|
type WatcherDispatcher struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
watchers []*watch.RaceFreeFakeWatcher
|
watchers []*watch.RaceFreeFakeWatcher
|
||||||
eventsSoFar []*watch.Event
|
eventsSoFar []*watch.Event
|
||||||
|
orderExecution chan func()
|
||||||
|
stopChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
|
func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
|
||||||
@ -55,6 +57,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
|
|||||||
func (wd *WatcherDispatcher) Stop() {
|
func (wd *WatcherDispatcher) Stop() {
|
||||||
wd.Lock()
|
wd.Lock()
|
||||||
defer wd.Unlock()
|
defer wd.Unlock()
|
||||||
|
close(wd.stopChan)
|
||||||
for _, watcher := range wd.watchers {
|
for _, watcher := range wd.watchers {
|
||||||
watcher.Stop()
|
watcher.Stop()
|
||||||
}
|
}
|
||||||
@ -136,9 +139,21 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object)
|
|||||||
// All subsequent requests for a watch on the client will result in returning this fake watcher.
|
// All subsequent requests for a watch on the client will result in returning this fake watcher.
|
||||||
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
|
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
|
||||||
dispatcher := &WatcherDispatcher{
|
dispatcher := &WatcherDispatcher{
|
||||||
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
|
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
|
||||||
eventsSoFar: make([]*watch.Event, 0),
|
eventsSoFar: make([]*watch.Event, 0),
|
||||||
|
orderExecution: make(chan func()),
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case fun := <-dispatcher.orderExecution:
|
||||||
|
fun()
|
||||||
|
case <-dispatcher.stopChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
|
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
|
||||||
watcher := watch.NewRaceFreeFake()
|
watcher := watch.NewRaceFreeFake()
|
||||||
@ -166,11 +181,11 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
|
|||||||
originalObj := createAction.GetObject()
|
originalObj := createAction.GetObject()
|
||||||
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
||||||
obj := copy(originalObj)
|
obj := copy(originalObj)
|
||||||
go func() {
|
watcher.orderExecution <- func() {
|
||||||
glog.V(4).Infof("Object created. Writing to channel: %v", obj)
|
glog.V(4).Infof("Object created. Writing to channel: %v", obj)
|
||||||
watcher.Add(obj)
|
watcher.Add(obj)
|
||||||
objChan <- obj
|
objChan <- obj
|
||||||
}()
|
}
|
||||||
return true, originalObj, nil
|
return true, originalObj, nil
|
||||||
})
|
})
|
||||||
return objChan
|
return objChan
|
||||||
@ -186,11 +201,11 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
|
|||||||
originalObj := updateAction.GetObject()
|
originalObj := updateAction.GetObject()
|
||||||
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
||||||
obj := copy(originalObj)
|
obj := copy(originalObj)
|
||||||
go func() {
|
watcher.orderExecution <- func() {
|
||||||
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
||||||
watcher.Modify(obj)
|
watcher.Modify(obj)
|
||||||
objChan <- obj
|
objChan <- obj
|
||||||
}()
|
}
|
||||||
return true, originalObj, nil
|
return true, originalObj, nil
|
||||||
})
|
})
|
||||||
return objChan
|
return objChan
|
||||||
|
Loading…
Reference in New Issue
Block a user