mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #28002 from asalkeld/init-cache-error
Automatic merge from submit-queue Fix startup type error in initializeCaches The following error was getting logged: PersistentVolumeController can't initialize caches, expected list of volumes, got: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink:/api/v1/persistentvolumes ResourceVersion:11} Items:[]} The tests make extensive use of NewFakeControllerSource which uses api.List instead of api.PersistentVolumeList. So use reflect to help iterate over the items then assert the item type. fixes #27757
This commit is contained in:
commit
1effc5af91
@ -31,17 +31,41 @@ import (
|
|||||||
|
|
||||||
func NewFakeControllerSource() *FakeControllerSource {
|
func NewFakeControllerSource() *FakeControllerSource {
|
||||||
return &FakeControllerSource{
|
return &FakeControllerSource{
|
||||||
items: map[nnu]runtime.Object{},
|
Items: map[nnu]runtime.Object{},
|
||||||
broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
|
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewFakePVControllerSource() *FakePVControllerSource {
|
||||||
|
return &FakePVControllerSource{
|
||||||
|
FakeControllerSource{
|
||||||
|
Items: map[nnu]runtime.Object{},
|
||||||
|
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakePVCControllerSource() *FakePVCControllerSource {
|
||||||
|
return &FakePVCControllerSource{
|
||||||
|
FakeControllerSource{
|
||||||
|
Items: map[nnu]runtime.Object{},
|
||||||
|
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
// FakeControllerSource implements listing/watching for testing.
|
// FakeControllerSource implements listing/watching for testing.
|
||||||
type FakeControllerSource struct {
|
type FakeControllerSource struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
items map[nnu]runtime.Object
|
Items map[nnu]runtime.Object
|
||||||
changes []watch.Event // one change per resourceVersion
|
changes []watch.Event // one change per resourceVersion
|
||||||
broadcaster *watch.Broadcaster
|
Broadcaster *watch.Broadcaster
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakePVControllerSource struct {
|
||||||
|
FakeControllerSource
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakePVCControllerSource struct {
|
||||||
|
FakeControllerSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// namespace, name, uid to be used as a key.
|
// namespace, name, uid to be used as a key.
|
||||||
@ -110,22 +134,19 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
|
|||||||
key := f.key(accessor)
|
key := f.key(accessor)
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case watch.Added, watch.Modified:
|
case watch.Added, watch.Modified:
|
||||||
f.items[key] = e.Object
|
f.Items[key] = e.Object
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
delete(f.items, key)
|
delete(f.Items, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rand.Float64() < watchProbability {
|
if rand.Float64() < watchProbability {
|
||||||
f.broadcaster.Action(e.Type, e.Object)
|
f.Broadcaster.Action(e.Type, e.Object)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list object, with its resource version set.
|
func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
|
||||||
func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) {
|
list := make([]runtime.Object, 0, len(f.Items))
|
||||||
f.lock.RLock()
|
for _, obj := range f.Items {
|
||||||
defer f.lock.RUnlock()
|
|
||||||
list := make([]runtime.Object, 0, len(f.items))
|
|
||||||
for _, obj := range f.items {
|
|
||||||
// Must make a copy to allow clients to modify the object.
|
// Must make a copy to allow clients to modify the object.
|
||||||
// Otherwise, if they make a change and write it back, they
|
// Otherwise, if they make a change and write it back, they
|
||||||
// will inadvertently change our canonical copy (in
|
// will inadvertently change our canonical copy (in
|
||||||
@ -136,6 +157,17 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er
|
|||||||
}
|
}
|
||||||
list = append(list, objCopy.(runtime.Object))
|
list = append(list, objCopy.(runtime.Object))
|
||||||
}
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns a list object, with its resource version set.
|
||||||
|
func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
f.lock.RLock()
|
||||||
|
defer f.lock.RUnlock()
|
||||||
|
list, err := f.getListItemsLocked()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
listObj := &api.List{}
|
listObj := &api.List{}
|
||||||
if err := meta.SetList(listObj, list); err != nil {
|
if err := meta.SetList(listObj, list); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -149,6 +181,48 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er
|
|||||||
return listObj, nil
|
return listObj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns a list object, with its resource version set.
|
||||||
|
func (f *FakePVControllerSource) List(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
f.lock.RLock()
|
||||||
|
defer f.lock.RUnlock()
|
||||||
|
list, err := f.FakeControllerSource.getListItemsLocked()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
listObj := &api.PersistentVolumeList{}
|
||||||
|
if err := meta.SetList(listObj, list); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
objMeta, err := api.ListMetaFor(listObj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resourceVersion := len(f.changes)
|
||||||
|
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
|
||||||
|
return listObj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns a list object, with its resource version set.
|
||||||
|
func (f *FakePVCControllerSource) List(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
f.lock.RLock()
|
||||||
|
defer f.lock.RUnlock()
|
||||||
|
list, err := f.FakeControllerSource.getListItemsLocked()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
listObj := &api.PersistentVolumeClaimList{}
|
||||||
|
if err := meta.SetList(listObj, list); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
objMeta, err := api.ListMetaFor(listObj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resourceVersion := len(f.changes)
|
||||||
|
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
|
||||||
|
return listObj, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Watch returns a watch, which will be pre-populated with all changes
|
// Watch returns a watch, which will be pre-populated with all changes
|
||||||
// after resourceVersion.
|
// after resourceVersion.
|
||||||
func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) {
|
func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) {
|
||||||
@ -172,11 +246,11 @@ func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface,
|
|||||||
}
|
}
|
||||||
changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)})
|
changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)})
|
||||||
}
|
}
|
||||||
return f.broadcaster.WatchWithPrefix(changes), nil
|
return f.Broadcaster.WatchWithPrefix(changes), nil
|
||||||
} else if rc > len(f.changes) {
|
} else if rc > len(f.changes) {
|
||||||
return nil, errors.New("resource version in the future not supported by this fake")
|
return nil, errors.New("resource version in the future not supported by this fake")
|
||||||
}
|
}
|
||||||
return f.broadcaster.Watch(), nil
|
return f.Broadcaster.Watch(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown closes the underlying broadcaster, waiting for events to be
|
// Shutdown closes the underlying broadcaster, waiting for events to be
|
||||||
@ -184,5 +258,5 @@ func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface,
|
|||||||
// enforced by Shutdown() leaving f locked.
|
// enforced by Shutdown() leaving f locked.
|
||||||
func (f *FakeControllerSource) Shutdown() {
|
func (f *FakeControllerSource) Shutdown() {
|
||||||
f.lock.Lock() // Purposely no unlock.
|
f.lock.Lock() // Purposely no unlock.
|
||||||
f.broadcaster.Shutdown()
|
f.Broadcaster.Shutdown()
|
||||||
}
|
}
|
||||||
|
@ -139,14 +139,14 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
|
|||||||
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
volumeList, ok := volumeListObj.(*api.List)
|
volumeList, ok := volumeListObj.(*api.PersistentVolumeList)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
|
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, volume := range volumeList.Items {
|
for _, volume := range volumeList.Items {
|
||||||
// Ignore template volumes from kubernetes 1.2
|
// Ignore template volumes from kubernetes 1.2
|
||||||
deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume))
|
deleted := ctrl.upgradeVolumeFrom1_2(&volume)
|
||||||
if !deleted {
|
if !deleted {
|
||||||
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
||||||
}
|
}
|
||||||
@ -157,9 +157,9 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
|
|||||||
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
claimList, ok := claimListObj.(*api.List)
|
claimList, ok := claimListObj.(*api.PersistentVolumeClaimList)
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj)
|
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", claimListObj)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, claim := range claimList.Items {
|
for _, claim := range claimList.Items {
|
||||||
|
@ -162,8 +162,8 @@ func TestControllerSync(t *testing.T) {
|
|||||||
|
|
||||||
// Initialize the controller
|
// Initialize the controller
|
||||||
client := &fake.Clientset{}
|
client := &fake.Clientset{}
|
||||||
volumeSource := framework.NewFakeControllerSource()
|
volumeSource := framework.NewFakePVControllerSource()
|
||||||
claimSource := framework.NewFakeControllerSource()
|
claimSource := framework.NewFakePVCControllerSource()
|
||||||
ctrl := newTestController(client, volumeSource, claimSource, true)
|
ctrl := newTestController(client, volumeSource, claimSource, true)
|
||||||
reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors)
|
reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors)
|
||||||
for _, claim := range test.initialClaims {
|
for _, claim := range test.initialClaims {
|
||||||
|
@ -123,8 +123,8 @@ type volumeReactor struct {
|
|||||||
changedObjects []interface{}
|
changedObjects []interface{}
|
||||||
changedSinceLastSync int
|
changedSinceLastSync int
|
||||||
ctrl *PersistentVolumeController
|
ctrl *PersistentVolumeController
|
||||||
volumeSource *framework.FakeControllerSource
|
volumeSource *framework.FakePVControllerSource
|
||||||
claimSource *framework.FakeControllerSource
|
claimSource *framework.FakePVCControllerSource
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
errors []reactorError
|
errors []reactorError
|
||||||
}
|
}
|
||||||
@ -542,7 +542,7 @@ func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {
|
|||||||
r.claimSource.Add(claim)
|
r.claimSource.Add(claim)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource, claimSource *framework.FakeControllerSource, errors []reactorError) *volumeReactor {
|
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *framework.FakePVControllerSource, claimSource *framework.FakePVCControllerSource, errors []reactorError) *volumeReactor {
|
||||||
reactor := &volumeReactor{
|
reactor := &volumeReactor{
|
||||||
volumes: make(map[string]*api.PersistentVolume),
|
volumes: make(map[string]*api.PersistentVolume),
|
||||||
claims: make(map[string]*api.PersistentVolumeClaim),
|
claims: make(map[string]*api.PersistentVolumeClaim),
|
||||||
@ -557,10 +557,10 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,
|
|||||||
|
|
||||||
func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController {
|
func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController {
|
||||||
if volumeSource == nil {
|
if volumeSource == nil {
|
||||||
volumeSource = framework.NewFakeControllerSource()
|
volumeSource = framework.NewFakePVControllerSource()
|
||||||
}
|
}
|
||||||
if claimSource == nil {
|
if claimSource == nil {
|
||||||
claimSource = framework.NewFakeControllerSource()
|
claimSource = framework.NewFakePVCControllerSource()
|
||||||
}
|
}
|
||||||
ctrl := NewPersistentVolumeController(
|
ctrl := NewPersistentVolumeController(
|
||||||
kubeClient,
|
kubeClient,
|
||||||
|
Loading…
Reference in New Issue
Block a user