From 79b91b9ee04e0e4e558ffb860602e86eb4d6053f Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Tue, 17 May 2016 14:55:34 +0200 Subject: [PATCH] Refactor persistent volume initialization There should be only one initialization function, shared by the real controller and unit tests. --- .../app/controllermanager.go | 1 + .../controllermanager/controllermanager.go | 3 + .../persistentvolume/controller_base.go | 76 +++++++++---------- .../persistentvolume/controller_test.go | 2 +- .../persistentvolume/framework_test.go | 34 ++++----- test/integration/persistent_volumes_test.go | 2 +- 6 files changed, 61 insertions(+), 57 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1a2bf5ca1c7..f55e4fc0ae9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -385,6 +385,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig ProbeRecyclableVolumePlugins(s.VolumeConfiguration), cloud, s.ClusterName, + nil, nil, nil, ) volumeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 36ee2e6546d..3374dc4ab5f 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -283,6 +283,9 @@ func (s *CMServer) Run(_ []string) error { kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfiguration), cloud, s.ClusterName, + nil, + nil, + nil, ) volumeController.Run() diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index de5ff8a5f0b..6a217a590c5 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -46,15 +46,20 @@ func NewPersistentVolumeController( provisioner vol.ProvisionableVolumePlugin, recyclers []vol.VolumePlugin, cloud cloudprovider.Interface, - clusterName string) *PersistentVolumeController { + clusterName string, + volumeSource, claimSource cache.ListerWatcher, + eventRecorder record.EventRecorder, +) *PersistentVolumeController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) - recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"}) + if eventRecorder == nil { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + eventRecorder = broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"}) + } controller := &PersistentVolumeController{ kubeClient: kubeClient, - eventRecorder: recorder, + eventRecorder: eventRecorder, runningOperations: make(map[string]bool), cloud: cloud, provisioner: provisioner, @@ -62,6 +67,7 @@ func NewPersistentVolumeController( createProvisionedPVRetryCount: createProvisionedPVRetryCount, createProvisionedPVInterval: createProvisionedPVInterval, } + controller.recyclePluginMgr.InitPlugins(recyclers, controller) if controller.provisioner != nil { if err := controller.provisioner.Init(controller); err != nil { @@ -69,56 +75,50 @@ func NewPersistentVolumeController( } } - volumeSource := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumes().Watch(options) - }, + if volumeSource == nil { + volumeSource = &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return kubeClient.Core().PersistentVolumes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return kubeClient.Core().PersistentVolumes().Watch(options) + }, + } } - claimSource := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) - }, + if claimSource == nil { + claimSource = &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) + }, + } } - controller.initializeController(syncPeriod, volumeSource, claimSource) - - return controller -} - -// initializeController prepares watching for PersistentVolume and -// PersistentVolumeClaim events from given sources. This should be used to -// initialize the controller for real operation (with real event sources) and -// also during testing (with fake ones). -func (ctrl *PersistentVolumeController) initializeController(syncPeriod time.Duration, volumeSource, claimSource cache.ListerWatcher) { - glog.V(4).Infof("initializing PersistentVolumeController, sync every %s", syncPeriod.String()) - ctrl.volumes.store, ctrl.volumeController = framework.NewIndexerInformer( + controller.volumes.store, controller.volumeController = framework.NewIndexerInformer( volumeSource, &api.PersistentVolume{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: ctrl.addVolume, - UpdateFunc: ctrl.updateVolume, - DeleteFunc: ctrl.deleteVolume, + AddFunc: controller.addVolume, + UpdateFunc: controller.updateVolume, + DeleteFunc: controller.deleteVolume, }, cache.Indexers{"accessmodes": accessModesIndexFunc}, ) - ctrl.claims, ctrl.claimController = framework.NewInformer( + controller.claims, controller.claimController = framework.NewInformer( claimSource, &api.PersistentVolumeClaim{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: ctrl.addClaim, - UpdateFunc: ctrl.updateClaim, - DeleteFunc: ctrl.deleteClaim, + AddFunc: controller.addClaim, + UpdateFunc: controller.updateClaim, + DeleteFunc: controller.deleteClaim, }, ) + return controller } // addVolume is callback from framework.Controller watching PersistentVolume diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index 9ae9010340f..d3f92e6b6f7 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -128,7 +128,7 @@ func TestControllerSync(t *testing.T) { client := &fake.Clientset{} volumeSource := framework.NewFakeControllerSource() claimSource := framework.NewFakeControllerSource() - ctrl := newPersistentVolumeController(client, volumeSource, claimSource) + ctrl := newTestController(client, volumeSource, claimSource) reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors) for _, claim := range test.initialClaims { claimSource.Add(claim) diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index f18578c5f3e..aea5dc4e8bd 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -484,27 +484,27 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, return reactor } -func newPersistentVolumeController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher) *PersistentVolumeController { - ctrl := &PersistentVolumeController{ - volumes: newPersistentVolumeOrderedIndex(), - claims: cache.NewStore(cache.MetaNamespaceKeyFunc), - kubeClient: kubeClient, - eventRecorder: record.NewFakeRecorder(1000), - runningOperations: make(map[string]bool), - - // Speed up the testing - createProvisionedPVRetryCount: createProvisionedPVRetryCount, - createProvisionedPVInterval: 5 * time.Millisecond, - } - - // Create dummy volume/claim sources for controller watchers when needed +func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher) *PersistentVolumeController { if volumeSource == nil { volumeSource = framework.NewFakeControllerSource() } if claimSource == nil { claimSource = framework.NewFakeControllerSource() } - ctrl.initializeController(5*time.Second, volumeSource, claimSource) + ctrl := NewPersistentVolumeController( + kubeClient, + 5*time.Second, // sync period + nil, // provisioner + []vol.VolumePlugin{}, // recyclers + nil, // cloud + "", + volumeSource, + claimSource, + record.NewFakeRecorder(1000), // event recorder + ) + + // Speed up the test + ctrl.createProvisionedPVInterval = 5 * time.Millisecond return ctrl } @@ -732,7 +732,7 @@ func runSyncTests(t *testing.T, tests []controllerTest) { // Initialize the controller client := &fake.Clientset{} - ctrl := newPersistentVolumeController(client, nil, nil) + ctrl := newTestController(client, nil, nil) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { ctrl.claims.Add(claim) @@ -776,7 +776,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest) { // Initialize the controller client := &fake.Clientset{} - ctrl := newPersistentVolumeController(client, nil, nil) + ctrl := newTestController(client, nil, nil) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { ctrl.claims.Add(claim) diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index b0dbc0c4bca..a5fa8a9f946 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -55,7 +55,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0, nil, nil, nil, nil}} cloud := &fake_cloud.FakeCloud{} - ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "") + ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "", nil, nil, nil) ctrl.Run() defer ctrl.Stop()