mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Merge pull request #32234 from mwielgus/fake-watch-fix
Automatic merge from submit-queue WatcherDispatcher for federated controller tests This PR fixes a problem when 2 informers create a watch for the same resource using the same client. Previously only one informer would receive updates. cc: @quinton-hoole @wojtek-t @kubernetes/sig-cluster-federation quinton-hoole: To provide some more context to those doing cherrypicking, the bug that this PR fixes makes federated unit tests fail intermittently, and generally behave very poorly.
This commit is contained in:
commit
34012d7877
@ -17,6 +17,9 @@ limitations under the License.
|
|||||||
package testutil
|
package testutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"runtime/pprof"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
@ -25,14 +28,110 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A structure that distributes eventes to multiple watchers.
|
||||||
|
type WatcherDispatcher struct {
|
||||||
|
sync.Mutex
|
||||||
|
watchers []*watch.FakeWatcher
|
||||||
|
eventsSoFar []*watch.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
wd.watchers = append(wd.watchers, watcher)
|
||||||
|
for _, event := range wd.eventsSoFar {
|
||||||
|
go watcher.Action(event.Type, event.Object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add sends an add event.
|
||||||
|
func (wd *WatcherDispatcher) Add(obj runtime.Object) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
event := &watch.Event{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
wd.eventsSoFar = append(wd.eventsSoFar, event)
|
||||||
|
for _, watcher := range wd.watchers {
|
||||||
|
go watcher.Add(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modify sends a modify event.
|
||||||
|
func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
event := &watch.Event{
|
||||||
|
Type: watch.Modified,
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
wd.eventsSoFar = append(wd.eventsSoFar, event)
|
||||||
|
for _, watcher := range wd.watchers {
|
||||||
|
go watcher.Modify(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete sends a delete event.
|
||||||
|
func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
event := &watch.Event{
|
||||||
|
Type: watch.Deleted,
|
||||||
|
Object: lastValue,
|
||||||
|
}
|
||||||
|
wd.eventsSoFar = append(wd.eventsSoFar, event)
|
||||||
|
for _, watcher := range wd.watchers {
|
||||||
|
go watcher.Delete(lastValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error sends an Error event.
|
||||||
|
func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
event := &watch.Event{
|
||||||
|
Type: watch.Error,
|
||||||
|
Object: errValue,
|
||||||
|
}
|
||||||
|
wd.eventsSoFar = append(wd.eventsSoFar, event)
|
||||||
|
for _, watcher := range wd.watchers {
|
||||||
|
go watcher.Error(errValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Action sends an event of the requested type, for table-based testing.
|
||||||
|
func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) {
|
||||||
|
wd.Lock()
|
||||||
|
defer wd.Unlock()
|
||||||
|
event := &watch.Event{
|
||||||
|
Type: action,
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
wd.eventsSoFar = append(wd.eventsSoFar, event)
|
||||||
|
for _, watcher := range wd.watchers {
|
||||||
|
go watcher.Action(action, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterFakeWatch adds a new fake watcher for the specified resource in the given fake client.
|
// RegisterFakeWatch adds a new fake watcher for the specified resource in the given fake client.
|
||||||
// All subsequent requrest for watch on the client will result in returning this fake watcher.
|
// All subsequent requests for a watch on the client will result in returning this fake watcher.
|
||||||
func RegisterFakeWatch(resource string, client *core.Fake) *watch.FakeWatcher {
|
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
|
||||||
|
dispatcher := &WatcherDispatcher{
|
||||||
|
watchers: make([]*watch.FakeWatcher, 0),
|
||||||
|
eventsSoFar: make([]*watch.Event, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
|
||||||
watcher := watch.NewFake()
|
watcher := watch.NewFake()
|
||||||
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil })
|
dispatcher.register(watcher)
|
||||||
return watcher
|
return true, watcher, nil
|
||||||
|
})
|
||||||
|
return dispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterFakeList registers a list response for the specified resource inside the given fake client.
|
// RegisterFakeList registers a list response for the specified resource inside the given fake client.
|
||||||
@ -43,10 +142,10 @@ func RegisterFakeList(resource string, client *core.Fake, obj runtime.Object) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes
|
// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
|
||||||
// all created object to the given watcher and also copies them to a channel for
|
// all created objects to the given watcher and also copies them to a channel for
|
||||||
// in-test inspection.
|
// in-test inspection.
|
||||||
func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object {
|
func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object {
|
||||||
objChan := make(chan runtime.Object, 100)
|
objChan := make(chan runtime.Object, 100)
|
||||||
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
|
client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
|
||||||
createAction := action.(core.CreateAction)
|
createAction := action.(core.CreateAction)
|
||||||
@ -60,15 +159,22 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch
|
|||||||
return objChan
|
return objChan
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes
|
// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes
|
||||||
// all updated object to the given watcher and also copies them to a channel for
|
// all updated objects to the given watcher and also copies them to a channel for
|
||||||
// in-test inspection.
|
// in-test inspection.
|
||||||
func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object {
|
func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object {
|
||||||
objChan := make(chan runtime.Object, 100)
|
objChan := make(chan runtime.Object, 100)
|
||||||
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
|
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
|
||||||
updateAction := action.(core.UpdateAction)
|
updateAction := action.(core.UpdateAction)
|
||||||
obj := updateAction.GetObject()
|
obj := updateAction.GetObject()
|
||||||
go func() {
|
go func() {
|
||||||
|
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
||||||
|
defer func() {
|
||||||
|
// Sometimes the channel is already closed.
|
||||||
|
if panicVal := recover(); panicVal != nil {
|
||||||
|
glog.Errorf("Recovering from panic: %v", panicVal)
|
||||||
|
}
|
||||||
|
}()
|
||||||
watcher.Modify(obj)
|
watcher.Modify(obj)
|
||||||
objChan <- obj
|
objChan <- obj
|
||||||
}()
|
}()
|
||||||
@ -83,7 +189,8 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object {
|
|||||||
select {
|
select {
|
||||||
case obj := <-c:
|
case obj := <-c:
|
||||||
return obj
|
return obj
|
||||||
case <-time.After(time.Minute):
|
case <-time.After(10 * time.Second):
|
||||||
|
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -93,11 +200,12 @@ func ToFederatedInformerForTestOnly(informer util.FederatedInformer) util.Federa
|
|||||||
return inter.(util.FederatedInformerForTestOnly)
|
return inter.(util.FederatedInformerForTestOnly)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCluster build a new cluster object.
|
// NewCluster builds a new cluster object.
|
||||||
func NewCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster {
|
func NewCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster {
|
||||||
return &federation_api.Cluster{
|
return &federation_api.Cluster{
|
||||||
ObjectMeta: api_v1.ObjectMeta{
|
ObjectMeta: api_v1.ObjectMeta{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Annotations: map[string]string{},
|
||||||
},
|
},
|
||||||
Status: federation_api.ClusterStatus{
|
Status: federation_api.ClusterStatus{
|
||||||
Conditions: []federation_api.ClusterCondition{
|
Conditions: []federation_api.ClusterCondition{
|
||||||
|
Loading…
Reference in New Issue
Block a user