From b4f7e67d251741c4f94bab9d3b636a459afa9b50 Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Sat, 25 Jun 2016 10:15:27 +1000 Subject: [PATCH] Fix startup type error in initializeCaches The following error was getting logged: PersistentVolumeController can't initialize caches, expected list of volumes, got: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink:/api/v1/persistentvolumes ResourceVersion:11} Items:[]} --- .../framework/fake_controller_source.go | 106 +++++++++++++++--- .../persistentvolume/controller_base.go | 8 +- .../persistentvolume/controller_test.go | 4 +- .../persistentvolume/framework_test.go | 10 +- 4 files changed, 101 insertions(+), 27 deletions(-) diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index bebacb531a6..9e90e7c916e 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -31,17 +31,41 @@ import ( func NewFakeControllerSource() *FakeControllerSource { return &FakeControllerSource{ - items: map[nnu]runtime.Object{}, - broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), } } +func NewFakePVControllerSource() *FakePVControllerSource { + return &FakePVControllerSource{ + FakeControllerSource{ + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + }} +} + +func NewFakePVCControllerSource() *FakePVCControllerSource { + return &FakePVCControllerSource{ + FakeControllerSource{ + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + }} +} + // FakeControllerSource implements listing/watching for testing. type FakeControllerSource struct { lock sync.RWMutex - items map[nnu]runtime.Object + Items map[nnu]runtime.Object changes []watch.Event // one change per resourceVersion - broadcaster *watch.Broadcaster + Broadcaster *watch.Broadcaster +} + +type FakePVControllerSource struct { + FakeControllerSource +} + +type FakePVCControllerSource struct { + FakeControllerSource } // namespace, name, uid to be used as a key. @@ -110,22 +134,19 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { key := f.key(accessor) switch e.Type { case watch.Added, watch.Modified: - f.items[key] = e.Object + f.Items[key] = e.Object case watch.Deleted: - delete(f.items, key) + delete(f.Items, key) } if rand.Float64() < watchProbability { - f.broadcaster.Action(e.Type, e.Object) + f.Broadcaster.Action(e.Type, e.Object) } } -// List returns a list object, with its resource version set. -func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]runtime.Object, 0, len(f.items)) - for _, obj := range f.items { +func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) { + list := make([]runtime.Object, 0, len(f.Items)) + for _, obj := range f.Items { // Must make a copy to allow clients to modify the object. // Otherwise, if they make a change and write it back, they // will inadvertently change our canonical copy (in @@ -136,6 +157,17 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er } list = append(list, objCopy.(runtime.Object)) } + return list, nil +} + +// List returns a list object, with its resource version set. +func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.getListItemsLocked() + if err != nil { + return nil, err + } listObj := &api.List{} if err := meta.SetList(listObj, list); err != nil { return nil, err @@ -149,6 +181,48 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er return listObj, nil } +// List returns a list object, with its resource version set. +func (f *FakePVControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.FakeControllerSource.getListItemsLocked() + if err != nil { + return nil, err + } + listObj := &api.PersistentVolumeList{} + if err := meta.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + +// List returns a list object, with its resource version set. +func (f *FakePVCControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.FakeControllerSource.getListItemsLocked() + if err != nil { + return nil, err + } + listObj := &api.PersistentVolumeClaimList{} + if err := meta.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + // Watch returns a watch, which will be pre-populated with all changes // after resourceVersion. func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) { @@ -172,11 +246,11 @@ func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, } changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)}) } - return f.broadcaster.WatchWithPrefix(changes), nil + return f.Broadcaster.WatchWithPrefix(changes), nil } else if rc > len(f.changes) { return nil, errors.New("resource version in the future not supported by this fake") } - return f.broadcaster.Watch(), nil + return f.Broadcaster.Watch(), nil } // Shutdown closes the underlying broadcaster, waiting for events to be @@ -184,5 +258,5 @@ func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, // enforced by Shutdown() leaving f locked. func (f *FakeControllerSource) Shutdown() { f.lock.Lock() // Purposely no unlock. - f.broadcaster.Shutdown() + f.Broadcaster.Shutdown() } diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index abface5f216..9cf03a81a21 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -139,14 +139,14 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) return } - volumeList, ok := volumeListObj.(*api.List) + volumeList, ok := volumeListObj.(*api.PersistentVolumeList) if !ok { glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj) return } for _, volume := range volumeList.Items { // Ignore template volumes from kubernetes 1.2 - deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume)) + deleted := ctrl.upgradeVolumeFrom1_2(&volume) if !deleted { storeObjectUpdate(ctrl.volumes.store, volume, "volume") } @@ -157,9 +157,9 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) return } - claimList, ok := claimListObj.(*api.List) + claimList, ok := claimListObj.(*api.PersistentVolumeClaimList) if !ok { - glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj) + glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", claimListObj) return } for _, claim := range claimList.Items { diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index 6dea329633e..4233e1b6bd1 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -162,8 +162,8 @@ func TestControllerSync(t *testing.T) { // Initialize the controller client := &fake.Clientset{} - volumeSource := framework.NewFakeControllerSource() - claimSource := framework.NewFakeControllerSource() + volumeSource := framework.NewFakePVControllerSource() + claimSource := framework.NewFakePVCControllerSource() ctrl := newTestController(client, volumeSource, claimSource, true) reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors) for _, claim := range test.initialClaims { diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index a2fd6dc1b32..c4b5752560c 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -123,8 +123,8 @@ type volumeReactor struct { changedObjects []interface{} changedSinceLastSync int ctrl *PersistentVolumeController - volumeSource *framework.FakeControllerSource - claimSource *framework.FakeControllerSource + volumeSource *framework.FakePVControllerSource + claimSource *framework.FakePVCControllerSource lock sync.Mutex errors []reactorError } @@ -542,7 +542,7 @@ func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) { r.claimSource.Add(claim) } -func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource, claimSource *framework.FakeControllerSource, errors []reactorError) *volumeReactor { +func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *framework.FakePVControllerSource, claimSource *framework.FakePVCControllerSource, errors []reactorError) *volumeReactor { reactor := &volumeReactor{ volumes: make(map[string]*api.PersistentVolume), claims: make(map[string]*api.PersistentVolumeClaim), @@ -557,10 +557,10 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController { if volumeSource == nil { - volumeSource = framework.NewFakeControllerSource() + volumeSource = framework.NewFakePVControllerSource() } if claimSource == nil { - claimSource = framework.NewFakeControllerSource() + claimSource = framework.NewFakePVCControllerSource() } ctrl := NewPersistentVolumeController( kubeClient,