From d973158a4ea6a3f0cde6f22b6f84cfc72ff1dfff Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 14:51:14 -0500 Subject: [PATCH] make controller manager use specified stop channel --- .../app/controllermanager.go | 43 +++++++++---------- pkg/controller/volume/attachdetach/BUILD | 2 +- .../attachdetach/attach_detach_controller.go | 9 +++- .../attach_detach_controller_test.go | 5 +-- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bf0cec64611..e6b6d049db9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -179,7 +179,7 @@ func Run(s *options.CMServer) error { clientBuilder = rootClientBuilder } - err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder) + err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -222,7 +222,7 @@ func Run(s *options.CMServer) error { panic("unreachable") } -func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder) error { +func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } @@ -254,13 +254,13 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, }, - ).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentSATokenSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). - Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) + Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( @@ -270,11 +270,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC), s.EnableGarbageCollector, - ).Run(int(s.ConcurrentRCSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentRCSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(), - int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop) + int(s.TerminatedPodGCThreshold)).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) @@ -342,7 +342,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } - go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), wait.NeverStop) + go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) // If apiserver is not running we should wait for some time and fail only then. This is particularly @@ -395,7 +395,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } } namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) - go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop) + go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) groupVersion := "extensions/v1beta1" @@ -407,28 +407,28 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) + Run(int(s.ConcurrentDaemonSetSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). - Run(int(s.ConcurrentJobSyncs), wait.NeverStop) + Run(int(s.ConcurrentJobSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop) + Run(int(s.ConcurrentDeploymentSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), wait.NeverStop) + Run(int(s.ConcurrentRSSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -450,7 +450,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl ) replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(wait.NeverStop) + Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -462,7 +462,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "poddisruptionbudgets") { glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(wait.NeverStop) + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -479,7 +479,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl sharedInformers.Pods().Informer(), client("statefulset-controller"), resyncPeriod, - ).Run(1, wait.NeverStop) + ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -493,7 +493,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} go cronjob.NewCronJobController(client("cronjob-controller")). - Run(wait.NeverStop) + Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -515,7 +515,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) - volumeController.Run(wait.NeverStop) + volumeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := @@ -526,12 +526,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumes().Informer(), cloud, - ProbeAttachableVolumePlugins(s.VolumeConfiguration), - recorder) + ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) } - go attachDetachController.Run(wait.NeverStop) + go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) groupVersion = "certificates.k8s.io/v1alpha1" @@ -552,7 +551,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if err != nil { glog.Errorf("Failed to start certificate controller: %v", err) } else { - go certController.Run(1, wait.NeverStop) + go certController.Run(1, stop) } time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -582,7 +581,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Errorf("Failed to start the generic garbage collector: %v", err) } else { workers := int(s.ConcurrentGCSyncs) - go garbageCollector.Run(workers, wait.NeverStop) + go garbageCollector.Run(workers, stop) } } diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index bcc22df9d49..0cfcfef9f38 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/release_1_5:go_default_library", + "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", @@ -42,7 +43,6 @@ go_test( library = "go_default_library", tags = ["automanaged"], deps = [ - "//pkg/client/record:go_default_library", "//pkg/controller/informers:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", ], diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index fb3c40c2d14..49fc16a7a2e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -76,8 +77,7 @@ func NewAttachDetachController( pvcInformer kcache.SharedInformer, pvInformer kcache.SharedInformer, cloud cloudprovider.Interface, - plugins []volume.VolumePlugin, - recorder record.EventRecorder) (AttachDetachController, error) { + plugins []volume.VolumePlugin) (AttachDetachController, error) { // TODO: The default resyncPeriod for shared informers is 12 hours, this is // unacceptable for the attach/detach controller. For example, if a pod is // skipped because the node it is scheduled to didn't set its annotation in @@ -115,6 +115,11 @@ func NewAttachDetachController( return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "attachdetach"}) + adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) adc.attacherDetacher = diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 2849237e37e..c301acd2b66 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/informers" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" ) @@ -33,7 +32,6 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) - fakeRecorder := &record.FakeRecorder{} // Act _, err := NewAttachDetachController( @@ -43,8 +41,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { pvcInformer, pvInformer, nil, /* cloud */ - nil, /* plugins */ - fakeRecorder) + nil /* plugins */) // Assert if err != nil {