diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2c6c1fc65e1..6f61be28f74 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -436,7 +436,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig nil, // eventRecorder s.VolumeConfiguration.EnableDynamicProvisioning, ) - volumeController.Run() + volumeController.Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index b4684d50c1b..9129fbe6ec4 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -302,7 +302,7 @@ func (s *CMServer) Run(_ []string) error { nil, // eventRecorder s.VolumeConfiguration.EnableDynamicProvisioning, ) - volumeController.Run() + volumeController.Run(wait.NeverStop) var rootCA []byte diff --git a/pkg/controller/volume/persistentvolume/controller.go b/pkg/controller/volume/persistentvolume/controller.go index a91012724f1..44f3edfc3ab 100644 --- a/pkg/controller/volume/persistentvolume/controller.go +++ b/pkg/controller/volume/persistentvolume/controller.go @@ -154,13 +154,10 @@ const createProvisionedPVInterval = 10 * time.Second // changes. type PersistentVolumeController struct { volumeController *framework.Controller - volumeControllerStopCh chan struct{} volumeSource cache.ListerWatcher claimController *framework.Controller - claimControllerStopCh chan struct{} claimSource cache.ListerWatcher classReflector *cache.Reflector - classReflectorStopCh chan struct{} classSource cache.ListerWatcher kubeClient clientset.Interface eventRecorder record.EventRecorder diff --git a/pkg/controller/volume/persistentvolume/controller_base.go b/pkg/controller/volume/persistentvolume/controller_base.go index 27d54a8b9d5..cd9704abf4c 100644 --- a/pkg/controller/volume/persistentvolume/controller_base.go +++ b/pkg/controller/volume/persistentvolume/controller_base.go @@ -442,33 +442,12 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { } // Run starts all of this controller's control loops -func (ctrl *PersistentVolumeController) Run() { +func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { glog.V(4).Infof("starting PersistentVolumeController") - ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) - - if ctrl.volumeControllerStopCh == nil { - ctrl.volumeControllerStopCh = make(chan struct{}) - go ctrl.volumeController.Run(ctrl.volumeControllerStopCh) - } - - if ctrl.claimControllerStopCh == nil { - ctrl.claimControllerStopCh = make(chan struct{}) - go ctrl.claimController.Run(ctrl.claimControllerStopCh) - } - - if ctrl.classReflectorStopCh == nil { - ctrl.classReflectorStopCh = make(chan struct{}) - go ctrl.classReflector.RunUntil(ctrl.classReflectorStopCh) - } -} - -// Stop gracefully shuts down this controller -func (ctrl *PersistentVolumeController) Stop() { - glog.V(4).Infof("stopping PersistentVolumeController") - close(ctrl.volumeControllerStopCh) - close(ctrl.claimControllerStopCh) - close(ctrl.classReflectorStopCh) + go ctrl.volumeController.Run(stopCh) + go ctrl.claimController.Run(stopCh) + go ctrl.classReflector.RunUntil(stopCh) } const ( diff --git a/pkg/controller/volume/persistentvolume/controller_test.go b/pkg/controller/volume/persistentvolume/controller_test.go index 8ca6b04c5cc..5e6b07560cf 100644 --- a/pkg/controller/volume/persistentvolume/controller_test.go +++ b/pkg/controller/volume/persistentvolume/controller_test.go @@ -175,7 +175,8 @@ func TestControllerSync(t *testing.T) { } // Start the controller - go ctrl.Run() + stopCh := make(chan struct{}) + ctrl.Run(stopCh) // Wait for the controller to pass initial sync and fill its caches. for !ctrl.volumeController.HasSynced() || @@ -201,7 +202,7 @@ func TestControllerSync(t *testing.T) { if err != nil { t.Errorf("Failed to run test %s: %v", test.name, err) } - ctrl.Stop() + close(stopCh) evaluateTestResults(ctrl, reactor, test, t) } diff --git a/test/integration/persistentvolumes/persistent_volumes_test.go b/test/integration/persistentvolumes/persistent_volumes_test.go index c5ec8b13198..760b3e03ee0 100644 --- a/test/integration/persistentvolumes/persistent_volumes_test.go +++ b/test/integration/persistentvolumes/persistent_volumes_test.go @@ -123,8 +123,9 @@ func TestPersistentVolumeRecycler(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - ctrl.Run() - defer ctrl.Stop() + stopCh := make(chan struct{}) + ctrl.Run(stopCh) + defer close(stopCh) // This PV will be claimed, released, and recycled. pv := createPV("fake-pv-recycler", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRecycle) @@ -176,8 +177,9 @@ func TestPersistentVolumeDeleter(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - ctrl.Run() - defer ctrl.Stop() + stopCh := make(chan struct{}) + ctrl.Run(stopCh) + defer close(stopCh) // This PV will be claimed, released, and deleted. pv := createPV("fake-pv-deleter", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimDelete) @@ -234,8 +236,9 @@ func TestPersistentVolumeBindRace(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - ctrl.Run() - defer ctrl.Stop() + stopCh := make(chan struct{}) + ctrl.Run(stopCh) + defer close(stopCh) pv := createPV("fake-pv-race", "/tmp/foo", "10G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRetain) pvc := createPVC("fake-pvc-race", ns.Name, "5G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce}) @@ -304,8 +307,9 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - controller.Run() - defer controller.Stop() + stopCh := make(chan struct{}) + controller.Run(stopCh) + defer close(stopCh) var ( err error @@ -383,8 +387,9 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - controller.Run() - defer controller.Stop() + stopCh := make(chan struct{}) + controller.Run(stopCh) + defer close(stopCh) var ( err error @@ -481,8 +486,9 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - controller.Run() - defer controller.Stop() + stopCh := make(chan struct{}) + controller.Run(stopCh) + defer close(stopCh) maxPVs := getObjectCount() pvs := make([]*api.PersistentVolume, maxPVs) @@ -569,8 +575,9 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - binder.Run() - defer binder.Stop() + controllerStopCh := make(chan struct{}) + binder.Run(controllerStopCh) + defer close(controllerStopCh) objCount := getObjectCount() pvs := make([]*api.PersistentVolume, objCount) @@ -781,8 +788,9 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { } // Start the controller when all PVs and PVCs are already saved in etcd - binder.Run() - defer binder.Stop() + stopCh := make(chan struct{}) + binder.Run(stopCh) + defer close(stopCh) // wait for at least two sync periods for changes. No volume should be // Released and no claim should be Lost during this time. @@ -867,8 +875,9 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { } testClient.Extensions().StorageClasses().Create(&storageClass) - binder.Run() - defer binder.Stop() + stopCh := make(chan struct{}) + binder.Run(stopCh) + defer close(stopCh) objCount := getObjectCount() pvcs := make([]*api.PersistentVolumeClaim, objCount) @@ -951,8 +960,9 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { // non-namespaced objects (PersistenceVolumes). defer testClient.Core().PersistentVolumes().DeleteCollection(nil, api.ListOptions{}) - controller.Run() - defer controller.Stop() + stopCh := make(chan struct{}) + controller.Run(stopCh) + defer close(stopCh) // This PV will be claimed, released, and deleted pv_rwo := createPV("pv-rwo", "/tmp/foo", "10G",