diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index c4ce317d8b8..e4f033f8ae2 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -17,6 +17,9 @@ limitations under the License. package testutil import ( + "os" + "runtime/pprof" + "sync" "time" federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" @@ -25,14 +28,110 @@ import ( "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" ) +// A structure that distributes eventes to multiple watchers. +type WatcherDispatcher struct { + sync.Mutex + watchers []*watch.FakeWatcher + eventsSoFar []*watch.Event +} + +func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) { + wd.Lock() + defer wd.Unlock() + wd.watchers = append(wd.watchers, watcher) + for _, event := range wd.eventsSoFar { + go watcher.Action(event.Type, event.Object) + } +} + +// Add sends an add event. +func (wd *WatcherDispatcher) Add(obj runtime.Object) { + wd.Lock() + defer wd.Unlock() + event := &watch.Event{ + Type: watch.Added, + Object: obj, + } + wd.eventsSoFar = append(wd.eventsSoFar, event) + for _, watcher := range wd.watchers { + go watcher.Add(obj) + } +} + +// Modify sends a modify event. +func (wd *WatcherDispatcher) Modify(obj runtime.Object) { + wd.Lock() + defer wd.Unlock() + event := &watch.Event{ + Type: watch.Modified, + Object: obj, + } + wd.eventsSoFar = append(wd.eventsSoFar, event) + for _, watcher := range wd.watchers { + go watcher.Modify(obj) + } +} + +// Delete sends a delete event. +func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) { + wd.Lock() + defer wd.Unlock() + event := &watch.Event{ + Type: watch.Deleted, + Object: lastValue, + } + wd.eventsSoFar = append(wd.eventsSoFar, event) + for _, watcher := range wd.watchers { + go watcher.Delete(lastValue) + } +} + +// Error sends an Error event. +func (wd *WatcherDispatcher) Error(errValue runtime.Object) { + wd.Lock() + defer wd.Unlock() + event := &watch.Event{ + Type: watch.Error, + Object: errValue, + } + wd.eventsSoFar = append(wd.eventsSoFar, event) + for _, watcher := range wd.watchers { + go watcher.Error(errValue) + } +} + +// Action sends an event of the requested type, for table-based testing. +func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) { + wd.Lock() + defer wd.Unlock() + event := &watch.Event{ + Type: action, + Object: obj, + } + wd.eventsSoFar = append(wd.eventsSoFar, event) + for _, watcher := range wd.watchers { + go watcher.Action(action, obj) + } +} + // RegisterFakeWatch adds a new fake watcher for the specified resource in the given fake client. -// All subsequent requrest for watch on the client will result in returning this fake watcher. -func RegisterFakeWatch(resource string, client *core.Fake) *watch.FakeWatcher { - watcher := watch.NewFake() - client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) - return watcher +// All subsequent requests for a watch on the client will result in returning this fake watcher. +func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { + dispatcher := &WatcherDispatcher{ + watchers: make([]*watch.FakeWatcher, 0), + eventsSoFar: make([]*watch.Event, 0), + } + + client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { + watcher := watch.NewFake() + dispatcher.register(watcher) + return true, watcher, nil + }) + return dispatcher } // RegisterFakeList registers a list response for the specified resource inside the given fake client. @@ -43,10 +142,10 @@ func RegisterFakeList(resource string, client *core.Fake, obj runtime.Object) { }) } -// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes -// all created object to the given watcher and also copies them to a channel for +// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes +// all created objects to the given watcher and also copies them to a channel for // in-test inspection. -func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { +func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object { objChan := make(chan runtime.Object, 100) client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { createAction := action.(core.CreateAction) @@ -60,15 +159,22 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *watch return objChan } -// RegisterFakeCopyOnCreate register a reactor in the given fake client that passes -// all updated object to the given watcher and also copies them to a channel for +// RegisterFakeCopyOnCreate registers a reactor in the given fake client that passes +// all updated objects to the given watcher and also copies them to a channel for // in-test inspection. -func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { +func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *WatcherDispatcher) chan runtime.Object { objChan := make(chan runtime.Object, 100) client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { updateAction := action.(core.UpdateAction) obj := updateAction.GetObject() go func() { + glog.V(4).Infof("Object updated. Writing to channel: %v", obj) + defer func() { + // Sometimes the channel is already closed. + if panicVal := recover(); panicVal != nil { + glog.Errorf("Recovering from panic: %v", panicVal) + } + }() watcher.Modify(obj) objChan <- obj }() @@ -83,7 +189,8 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object { select { case obj := <-c: return obj - case <-time.After(time.Minute): + case <-time.After(10 * time.Second): + pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) return nil } } @@ -93,11 +200,12 @@ func ToFederatedInformerForTestOnly(informer util.FederatedInformer) util.Federa return inter.(util.FederatedInformerForTestOnly) } -// NewCluster build a new cluster object. +// NewCluster builds a new cluster object. func NewCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { return &federation_api.Cluster{ ObjectMeta: api_v1.ObjectMeta{ - Name: name, + Name: name, + Annotations: map[string]string{}, }, Status: federation_api.ClusterStatus{ Conditions: []federation_api.ClusterCondition{