From d973158a4ea6a3f0cde6f22b6f84cfc72ff1dfff Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 14:51:14 -0500 Subject: [PATCH 1/5] make controller manager use specified stop channel --- .../app/controllermanager.go | 43 +++++++++---------- pkg/controller/volume/attachdetach/BUILD | 2 +- .../attachdetach/attach_detach_controller.go | 9 +++- .../attach_detach_controller_test.go | 5 +-- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bf0cec64611..e6b6d049db9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -179,7 +179,7 @@ func Run(s *options.CMServer) error { clientBuilder = rootClientBuilder } - err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder) + err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -222,7 +222,7 @@ func Run(s *options.CMServer) error { panic("unreachable") } -func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder) error { +func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } @@ -254,13 +254,13 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, }, - ).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentSATokenSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). - Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) + Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( @@ -270,11 +270,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC), s.EnableGarbageCollector, - ).Run(int(s.ConcurrentRCSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentRCSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(), - int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop) + int(s.TerminatedPodGCThreshold)).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) @@ -342,7 +342,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } - go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), wait.NeverStop) + go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) // If apiserver is not running we should wait for some time and fail only then. This is particularly @@ -395,7 +395,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } } namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) - go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop) + go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) groupVersion := "extensions/v1beta1" @@ -407,28 +407,28 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) + Run(int(s.ConcurrentDaemonSetSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). - Run(int(s.ConcurrentJobSyncs), wait.NeverStop) + Run(int(s.ConcurrentJobSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop) + Run(int(s.ConcurrentDeploymentSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), wait.NeverStop) + Run(int(s.ConcurrentRSSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -450,7 +450,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl ) replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(wait.NeverStop) + Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -462,7 +462,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "poddisruptionbudgets") { glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(wait.NeverStop) + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -479,7 +479,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl sharedInformers.Pods().Informer(), client("statefulset-controller"), resyncPeriod, - ).Run(1, wait.NeverStop) + ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } @@ -493,7 +493,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} go cronjob.NewCronJobController(client("cronjob-controller")). - Run(wait.NeverStop) + Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -515,7 +515,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) - volumeController.Run(wait.NeverStop) + volumeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := @@ -526,12 +526,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumes().Informer(), cloud, - ProbeAttachableVolumePlugins(s.VolumeConfiguration), - recorder) + ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) } - go attachDetachController.Run(wait.NeverStop) + go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) groupVersion = "certificates.k8s.io/v1alpha1" @@ -552,7 +551,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if err != nil { glog.Errorf("Failed to start certificate controller: %v", err) } else { - go certController.Run(1, wait.NeverStop) + go certController.Run(1, stop) } time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -582,7 +581,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Errorf("Failed to start the generic garbage collector: %v", err) } else { workers := int(s.ConcurrentGCSyncs) - go garbageCollector.Run(workers, wait.NeverStop) + go garbageCollector.Run(workers, stop) } } diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index bcc22df9d49..0cfcfef9f38 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/release_1_5:go_default_library", + "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", @@ -42,7 +43,6 @@ go_test( library = "go_default_library", tags = ["automanaged"], deps = [ - "//pkg/client/record:go_default_library", "//pkg/controller/informers:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", ], diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index fb3c40c2d14..49fc16a7a2e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -76,8 +77,7 @@ func NewAttachDetachController( pvcInformer kcache.SharedInformer, pvInformer kcache.SharedInformer, cloud cloudprovider.Interface, - plugins []volume.VolumePlugin, - recorder record.EventRecorder) (AttachDetachController, error) { + plugins []volume.VolumePlugin) (AttachDetachController, error) { // TODO: The default resyncPeriod for shared informers is 12 hours, this is // unacceptable for the attach/detach controller. For example, if a pod is // skipped because the node it is scheduled to didn't set its annotation in @@ -115,6 +115,11 @@ func NewAttachDetachController( return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "attachdetach"}) + adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) adc.attacherDetacher = diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 2849237e37e..c301acd2b66 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/informers" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" ) @@ -33,7 +32,6 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) - fakeRecorder := &record.FakeRecorder{} // Act _, err := NewAttachDetachController( @@ -43,8 +41,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { pvcInformer, pvInformer, nil, /* cloud */ - nil, /* plugins */ - fakeRecorder) + nil /* plugins */) // Assert if err != nil { From 60806205539360451f0fb2559142aa8f7e540757 Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 15:00:20 -0500 Subject: [PATCH 2/5] simplify enabled resource checks --- cmd/kube-controller-manager/app/BUILD | 2 +- .../app/controllermanager.go | 274 ++++++++---------- 2 files changed, 120 insertions(+), 156 deletions(-) diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 1dc8f977b1d..f1405b5603f 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -20,7 +20,6 @@ go_library( deps = [ "//cmd/kube-controller-manager/app/options:go_default_library", "//pkg/api:go_default_library", - "//pkg/api/unversioned:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch:go_default_library", @@ -31,6 +30,7 @@ go_library( "//pkg/client/leaderelection/resourcelock:go_default_library", "//pkg/client/record:go_default_library", "//pkg/client/restclient:go_default_library", + "//pkg/client/typed/discovery:go_default_library", "//pkg/client/typed/dynamic:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/cloudprovider:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index e6b6d049db9..36062581c96 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" @@ -43,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" @@ -222,11 +222,50 @@ func Run(s *options.CMServer) error { panic("unreachable") } +// TODO: In general, any controller checking this needs to be dynamic so +// users don't have to restart their controller manager if they change the apiserver. +func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) { + var discoveryClient discovery.DiscoveryInterface + + // If apiserver is not running we should wait for some time and fail only then. This is particularly + // important when we start apiserver and controller manager at the same time. + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + client, err := clientBuilder.Client("controller-discovery") + if err != nil { + glog.Errorf("Failed to get api versions from server: %v", err) + return false, nil + } + + discoveryClient = client.Discovery() + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("Failed to get api versions from server: %v", err) + } + + resourceMap, err := discoveryClient.ServerResources() + if err != nil { + return nil, fmt.Errorf("Failed to get supported resources from server: %v", err) + } + + allResources := map[schema.GroupVersionResource]bool{} + for _, apiResourceList := range resourceMap { + version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) + if err != nil { + return nil, err + } + for _, apiResource := range apiResourceList.APIResources { + allResources[version.WithResource(apiResource.Name)] = true + } + } + + return allResources, nil +} + func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } - discoveryClient := client("controller-discovery").Discovery() sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)()) // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest @@ -259,6 +298,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } } + availableResources, err := getAvailableResources(clientBuilder) + if err != nil { + return err + } + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -345,26 +389,6 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - // If apiserver is not running we should wait for some time and fail only then. This is particularly - // important when we start apiserver and controller manager at the same time. - var versionStrings []string - err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { - if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil { - return true, nil - } - glog.Errorf("Failed to get api versions from server: %v", err) - return false, nil - }) - if err != nil { - glog.Fatalf("Failed to get api versions from server: %v", err) - } - versions := &unversioned.APIVersions{Versions: versionStrings} - - resourceMap, err := discoveryClient.ServerResources() - if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) - } - // TODO: should use a dynamic RESTMapper built from the discovery results. restMapper := registered.RESTMapper() @@ -398,107 +422,72 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion := "extensions/v1beta1" - resources, found := resourceMap[groupVersion] - // TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - - if containsResource(resources, "daemonsets") { - glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "jobs") { - glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). - Run(int(s.ConcurrentJobSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "deployments") { - glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "replicasets") { - glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). + Run(int(s.ConcurrentDaemonSetSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "autoscaling/v1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start horizontal pod autoscaler controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "horizontalpodautoscalers") { - glog.Infof("Starting horizontal pod controller.") - hpaClient := client("horizontal-pod-autoscaler") - metricsClient := metrics.NewHeapsterMetricsClient( - hpaClient, - metrics.DefaultHeapsterNamespace, - metrics.DefaultHeapsterScheme, - metrics.DefaultHeapsterService, - metrics.DefaultHeapsterPort, - ) - replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { + glog.Infof("Starting job controller") + go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). + Run(int(s.ConcurrentJobSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "policy/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "poddisruptionbudgets") { - glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { + glog.Infof("Starting deployment controller") + go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). + Run(int(s.ConcurrentDeploymentSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "apps/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start statefulset, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "statefulsets") { - glog.Infof("Starting StatefulSet controller") - resyncPeriod := ResyncPeriod(s)() - go petset.NewStatefulSetController( - sharedInformers.Pods().Informer(), - client("statefulset-controller"), - resyncPeriod, - ).Run(1, stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { + glog.Infof("Starting ReplicaSet controller") + go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + Run(int(s.ConcurrentRSSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "batch/v2alpha1" - resources, found = resourceMap[groupVersion] - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "cronjobs") { - glog.Infof("Starting cronjob controller") - // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset - kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go cronjob.NewCronJobController(client("cronjob-controller")). - Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - } else { - glog.Infof("Not starting %s apis", groupVersion) + if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { + glog.Infof("Starting horizontal pod autoscaler controller.") + hpaClient := client("horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) + go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). + Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { + glog.Infof("Starting disruption controller") + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { + glog.Infof("Starting StatefulSet controller") + resyncPeriod := ResyncPeriod(s)() + go petset.NewStatefulSetController( + sharedInformers.Pods().Informer(), + client("statefulset-controller"), + resyncPeriod, + ).Run(1, stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { + glog.Infof("Starting cronjob controller") + // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset + kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} + go cronjob.NewCronJobController(client("cronjob-controller")).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration) @@ -533,28 +522,22 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion = "certificates.k8s.io/v1alpha1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start certificates, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "certificatesigningrequests") { - glog.Infof("Starting certificate request controller") - resyncPeriod := ResyncPeriod(s)() - certController, err := certcontroller.NewCertificateController( - client("certificate-controller"), - resyncPeriod, - s.ClusterSigningCertFile, - s.ClusterSigningKeyFile, - s.ApproveAllKubeletCSRsForGroup, - ) - if err != nil { - glog.Errorf("Failed to start certificate controller: %v", err) - } else { - go certController.Run(1, stop) - } - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + if availableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] { + glog.Infof("Starting certificate request controller") + resyncPeriod := ResyncPeriod(s)() + certController, err := certcontroller.NewCertificateController( + client("certificate-controller"), + resyncPeriod, + s.ClusterSigningCertFile, + s.ClusterSigningKeyFile, + s.ApproveAllKubeletCSRsForGroup, + ) + if err != nil { + glog.Errorf("Failed to start certificate controller: %v", err) + } else { + go certController.Run(1, stop) } + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } go serviceaccountcontroller.NewServiceAccountsController( @@ -589,22 +572,3 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl select {} } - -func containsVersion(versions *unversioned.APIVersions, version string) bool { - for ix := range versions.Versions { - if versions.Versions[ix] == version { - return true - } - } - return false -} - -func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { - for ix := range resources.APIResources { - resource := resources.APIResources[ix] - if resource.Name == resourceName { - return true - } - } - return false -} From 49ebc2c2ae7441b61a654b22781d5065160be145 Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 15:09:12 -0500 Subject: [PATCH 3/5] remove unnecessary startcontroller options --- .../app/controllermanager.go | 15 ++++++++------- pkg/controller/client_builder.go | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 36062581c96..939c38d7795 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -179,7 +179,7 @@ func Run(s *options.CMServer) error { clientBuilder = rootClientBuilder } - err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop) + err := StartControllers(s, rootClientBuilder, clientBuilder, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -262,7 +262,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma return allResources, nil } -func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { +func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } @@ -284,7 +284,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err) } } else { - rootCA = kubeconfig.CAData + rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData } go serviceaccountcontroller.NewTokensController( @@ -394,7 +394,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl // Find the list of namespaced resources via discovery that the namespace controller must manage namespaceKubeClient := client("namespace-controller") - namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) + namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) // TODO: consider using a list-watch + cache here rather than polling var gvrFn func() ([]schema.GroupVersionResource, error) rsrcs, err := namespaceKubeClient.Discovery().ServerResources() @@ -485,8 +485,9 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { glog.Infof("Starting cronjob controller") // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset - kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go cronjob.NewCronJobController(client("cronjob-controller")).Run(stop) + cronjobConfig := rootClientBuilder.ConfigOrDie("cronjob-controller") + cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} + go cronjob.NewCronJobController(clientset.NewForConfigOrDie(cronjobConfig)).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -554,7 +555,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Fatalf("Failed to get supported resources from server: %v", err) } - config := restclient.AddUserAgent(kubeconfig, "generic-garbage-collector") + config := rootClientBuilder.ConfigOrDie("generic-garbage-collector") config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig = dynamic.ContentConfig() diff --git a/pkg/controller/client_builder.go b/pkg/controller/client_builder.go index f37de8499bd..5bfb50f52a7 100644 --- a/pkg/controller/client_builder.go +++ b/pkg/controller/client_builder.go @@ -38,6 +38,7 @@ import ( // ControllerClientBuilder allow syou to get clients and configs for controllers type ControllerClientBuilder interface { Config(name string) (*restclient.Config, error) + ConfigOrDie(name string) *restclient.Config Client(name string) (clientset.Interface, error) ClientOrDie(name string) clientset.Interface } @@ -53,6 +54,14 @@ func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, return &clientConfig, nil } +func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig +} + func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, error) { clientConfig, err := b.Config(name) if err != nil { @@ -150,6 +159,14 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro return clientConfig, nil } +func (b SAControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig +} + func (b SAControllerClientBuilder) Client(name string) (clientset.Interface, error) { clientConfig, err := b.Config(name) if err != nil { From 21c304333a5ff9a7fcd0e03c9fefbb129d2b06c9 Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 15:10:59 -0500 Subject: [PATCH 4/5] return errors instead of fataling --- .../app/controllermanager.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 939c38d7795..87cb2d9fafd 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -240,12 +240,12 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma return true, nil }) if err != nil { - return nil, fmt.Errorf("Failed to get api versions from server: %v", err) + return nil, fmt.Errorf("failed to get api versions from server: %v", err) } resourceMap, err := discoveryClient.ServerResources() if err != nil { - return nil, fmt.Errorf("Failed to get supported resources from server: %v", err) + return nil, fmt.Errorf("failed to get supported resources from server: %v", err) } allResources := map[schema.GroupVersionResource]bool{} @@ -272,7 +272,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont if len(s.ServiceAccountKeyFile) > 0 { privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile) if err != nil { - return fmt.Errorf("Error reading key for service account token controller: %v", err) + return fmt.Errorf("error reading key for service account token controller: %v", err) } else { var rootCA []byte if s.RootCAFile != "" { @@ -323,7 +323,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { - glog.Fatalf("Cloud provider could not be initialized: %v", err) + return fmt.Errorf("cloud provider could not be initialized: %v", err) } _, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR) @@ -341,7 +341,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) if err != nil { - glog.Fatalf("Failed to initialize nodecontroller: %v", err) + return fmt.Errorf("failed to initialize nodecontroller: %v", err) } nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -399,7 +399,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont var gvrFn func() ([]schema.GroupVersionResource, error) rsrcs, err := namespaceKubeClient.Discovery().ServerResources() if err != nil { - glog.Fatalf("Failed to get group version resources: %v", err) + return fmt.Errorf("failed to get group version resources: %v", err) } for _, rsrcList := range rsrcs { for ix := range rsrcList.APIResources { @@ -412,7 +412,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont if gvrFn == nil { gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() if err != nil { - glog.Fatalf("Failed to get resources: %v", err) + return fmt.Errorf("failed to get resources: %v", err) } gvrFn = func() ([]schema.GroupVersionResource, error) { return gvr, nil @@ -493,7 +493,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration) if err != nil { - glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) + return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } params := persistentvolumecontroller.ControllerParameters{ KubeClient: client("persistent-volume-binder"), @@ -518,7 +518,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont cloud, ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { - glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) + return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) } go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -552,7 +552,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont gcClientset := client("generic-garbage-collector") groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources() if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) + return fmt.Errorf("failed to get supported resources from server: %v", err) } config := rootClientBuilder.ConfigOrDie("generic-garbage-collector") From 585daa2069d34d912367fcf64c1738a697fec2a6 Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 21 Nov 2016 15:13:22 -0500 Subject: [PATCH 5/5] use the client builder to support using SAs --- .../app/controllermanager.go | 43 +++++++++---------- pkg/controller/client_builder.go | 4 +- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 87cb2d9fafd..0c919152733 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -263,10 +263,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma } func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { - client := func(serviceAccountName string) clientset.Interface { - return rootClientBuilder.ClientOrDie(serviceAccountName) - } - sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)()) + sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)()) // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest if len(s.ServiceAccountKeyFile) > 0 { @@ -303,7 +300,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont return err } - go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")). Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -317,7 +314,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont ).Run(int(s.ConcurrentRCSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(), + go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(), int(s.TerminatedPodGCThreshold)).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -336,7 +333,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont } nodeController, err := nodecontroller.NewNodeController( sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(), - cloud, client("node-controller"), + cloud, clientBuilder.ClientOrDie("node-controller"), s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) @@ -346,7 +343,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) + serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName) if err != nil { glog.Errorf("Failed to start service controller: %v", err) } else { @@ -360,7 +357,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) + routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), s.ClusterName, clusterCIDR) routeController.Run(s.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -368,7 +365,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) } - resourceQuotaControllerClient := client("resourcequota-controller") + resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers) groupKindsToReplenish := []schema.GroupKind{ api.Kind("Pod"), @@ -393,7 +390,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont restMapper := registered.RESTMapper() // Find the list of namespaced resources via discovery that the namespace controller must manage - namespaceKubeClient := client("namespace-controller") + namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller") namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) // TODO: consider using a list-watch + cache here rather than polling var gvrFn func() ([]schema.GroupVersionResource, error) @@ -423,35 +420,35 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). + go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), clientBuilder.ClientOrDie("job-controller")). Run(int(s.ConcurrentJobSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). + go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("deployment-controller")). Run(int(s.ConcurrentDeploymentSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { glog.Infof("Starting horizontal pod autoscaler controller.") - hpaClient := client("horizontal-pod-autoscaler") + hpaClient := clientBuilder.ClientOrDie("horizontal-pod-autoscaler") metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, @@ -467,7 +464,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("disruption-controller")).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -476,7 +473,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont resyncPeriod := ResyncPeriod(s)() go petset.NewStatefulSetController( sharedInformers.Pods().Informer(), - client("statefulset-controller"), + clientBuilder.ClientOrDie("statefulset-controller"), resyncPeriod, ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -496,7 +493,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } params := persistentvolumecontroller.ControllerParameters{ - KubeClient: client("persistent-volume-binder"), + KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"), SyncPeriod: s.PVClaimBinderSyncPeriod.Duration, AlphaProvisioner: alphaProvisioner, VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), @@ -510,7 +507,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( - client("attachdetach-controller"), + clientBuilder.ClientOrDie("attachdetach-controller"), sharedInformers.Pods().Informer(), sharedInformers.Nodes().Informer(), sharedInformers.PersistentVolumeClaims().Informer(), @@ -527,7 +524,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont glog.Infof("Starting certificate request controller") resyncPeriod := ResyncPeriod(s)() certController, err := certcontroller.NewCertificateController( - client("certificate-controller"), + clientBuilder.ClientOrDie("certificate-controller"), resyncPeriod, s.ClusterSigningCertFile, s.ClusterSigningKeyFile, @@ -543,13 +540,13 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont go serviceaccountcontroller.NewServiceAccountsController( sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(), - client("service-account-controller"), + clientBuilder.ClientOrDie("service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.EnableGarbageCollector { - gcClientset := client("generic-garbage-collector") + gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector") groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources() if err != nil { return fmt.Errorf("failed to get supported resources from server: %v", err) diff --git a/pkg/controller/client_builder.go b/pkg/controller/client_builder.go index 5bfb50f52a7..3913a1bd3c1 100644 --- a/pkg/controller/client_builder.go +++ b/pkg/controller/client_builder.go @@ -51,7 +51,7 @@ type SimpleControllerClientBuilder struct { func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) { clientConfig := *b.ClientConfig - return &clientConfig, nil + return restclient.AddUserAgent(&clientConfig, name), nil } func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { @@ -67,7 +67,7 @@ func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, if err != nil { return nil, err } - return clientset.NewForConfig(restclient.AddUserAgent(clientConfig, name)) + return clientset.NewForConfig(clientConfig) } func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface {