diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 1dc8f977b1d..f1405b5603f 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -20,7 +20,6 @@ go_library( deps = [ "//cmd/kube-controller-manager/app/options:go_default_library", "//pkg/api:go_default_library", - "//pkg/api/unversioned:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch:go_default_library", @@ -31,6 +30,7 @@ go_library( "//pkg/client/leaderelection/resourcelock:go_default_library", "//pkg/client/record:go_default_library", "//pkg/client/restclient:go_default_library", + "//pkg/client/typed/discovery:go_default_library", "//pkg/client/typed/dynamic:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/cloudprovider:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bf0cec64611..0c919152733 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" @@ -43,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" @@ -179,7 +179,7 @@ func Run(s *options.CMServer) error { clientBuilder = rootClientBuilder } - err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder) + err := StartControllers(s, rootClientBuilder, clientBuilder, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -222,18 +222,54 @@ func Run(s *options.CMServer) error { panic("unreachable") } -func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder) error { - client := func(serviceAccountName string) clientset.Interface { - return rootClientBuilder.ClientOrDie(serviceAccountName) +// TODO: In general, any controller checking this needs to be dynamic so +// users don't have to restart their controller manager if they change the apiserver. +func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) { + var discoveryClient discovery.DiscoveryInterface + + // If apiserver is not running we should wait for some time and fail only then. This is particularly + // important when we start apiserver and controller manager at the same time. + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + client, err := clientBuilder.Client("controller-discovery") + if err != nil { + glog.Errorf("Failed to get api versions from server: %v", err) + return false, nil + } + + discoveryClient = client.Discovery() + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to get api versions from server: %v", err) } - discoveryClient := client("controller-discovery").Discovery() - sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)()) + + resourceMap, err := discoveryClient.ServerResources() + if err != nil { + return nil, fmt.Errorf("failed to get supported resources from server: %v", err) + } + + allResources := map[schema.GroupVersionResource]bool{} + for _, apiResourceList := range resourceMap { + version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) + if err != nil { + return nil, err + } + for _, apiResource := range apiResourceList.APIResources { + allResources[version.WithResource(apiResource.Name)] = true + } + } + + return allResources, nil +} + +func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { + sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)()) // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest if len(s.ServiceAccountKeyFile) > 0 { privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile) if err != nil { - return fmt.Errorf("Error reading key for service account token controller: %v", err) + return fmt.Errorf("error reading key for service account token controller: %v", err) } else { var rootCA []byte if s.RootCAFile != "" { @@ -245,7 +281,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err) } } else { - rootCA = kubeconfig.CAData + rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData } go serviceaccountcontroller.NewTokensController( @@ -254,13 +290,18 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, }, - ).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentSATokenSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } - go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). - Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) + availableResources, err := getAvailableResources(clientBuilder) + if err != nil { + return err + } + + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")). + Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( @@ -270,16 +311,16 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC), s.EnableGarbageCollector, - ).Run(int(s.ConcurrentRCSyncs), wait.NeverStop) + ).Run(int(s.ConcurrentRCSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(), - int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop) + go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(), + int(s.TerminatedPodGCThreshold)).Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { - glog.Fatalf("Cloud provider could not be initialized: %v", err) + return fmt.Errorf("cloud provider could not be initialized: %v", err) } _, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR) @@ -292,17 +333,17 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } nodeController, err := nodecontroller.NewNodeController( sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(), - cloud, client("node-controller"), + cloud, clientBuilder.ClientOrDie("node-controller"), s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) if err != nil { - glog.Fatalf("Failed to initialize nodecontroller: %v", err) + return fmt.Errorf("failed to initialize nodecontroller: %v", err) } nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) + serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName) if err != nil { glog.Errorf("Failed to start service controller: %v", err) } else { @@ -316,7 +357,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) + routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), s.ClusterName, clusterCIDR) routeController.Run(s.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -324,7 +365,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) } - resourceQuotaControllerClient := client("resourcequota-controller") + resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers) groupKindsToReplenish := []schema.GroupKind{ api.Kind("Pod"), @@ -342,40 +383,20 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } - go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), wait.NeverStop) + go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - // If apiserver is not running we should wait for some time and fail only then. This is particularly - // important when we start apiserver and controller manager at the same time. - var versionStrings []string - err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { - if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil { - return true, nil - } - glog.Errorf("Failed to get api versions from server: %v", err) - return false, nil - }) - if err != nil { - glog.Fatalf("Failed to get api versions from server: %v", err) - } - versions := &unversioned.APIVersions{Versions: versionStrings} - - resourceMap, err := discoveryClient.ServerResources() - if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) - } - // TODO: should use a dynamic RESTMapper built from the discovery results. restMapper := registered.RESTMapper() // Find the list of namespaced resources via discovery that the namespace controller must manage - namespaceKubeClient := client("namespace-controller") - namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) + namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller") + namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) // TODO: consider using a list-watch + cache here rather than polling var gvrFn func() ([]schema.GroupVersionResource, error) rsrcs, err := namespaceKubeClient.Discovery().ServerResources() if err != nil { - glog.Fatalf("Failed to get group version resources: %v", err) + return fmt.Errorf("failed to get group version resources: %v", err) } for _, rsrcList := range rsrcs { for ix := range rsrcList.APIResources { @@ -388,125 +409,91 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if gvrFn == nil { gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() if err != nil { - glog.Fatalf("Failed to get resources: %v", err) + return fmt.Errorf("failed to get resources: %v", err) } gvrFn = func() ([]schema.GroupVersionResource, error) { return gvr, nil } } namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) - go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop) + go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion := "extensions/v1beta1" - resources, found := resourceMap[groupVersion] - // TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - - if containsResource(resources, "daemonsets") { - glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "jobs") { - glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). - Run(int(s.ConcurrentJobSyncs), wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "deployments") { - glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "replicasets") { - glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). + Run(int(s.ConcurrentDaemonSetSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "autoscaling/v1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start horizontal pod autoscaler controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "horizontalpodautoscalers") { - glog.Infof("Starting horizontal pod controller.") - hpaClient := client("horizontal-pod-autoscaler") - metricsClient := metrics.NewHeapsterMetricsClient( - hpaClient, - metrics.DefaultHeapsterNamespace, - metrics.DefaultHeapsterScheme, - metrics.DefaultHeapsterService, - metrics.DefaultHeapsterPort, - ) - replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { + glog.Infof("Starting job controller") + go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), clientBuilder.ClientOrDie("job-controller")). + Run(int(s.ConcurrentJobSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "policy/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "poddisruptionbudgets") { - glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { + glog.Infof("Starting deployment controller") + go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("deployment-controller")). + Run(int(s.ConcurrentDeploymentSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "apps/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start statefulset, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "statefulsets") { - glog.Infof("Starting StatefulSet controller") - resyncPeriod := ResyncPeriod(s)() - go petset.NewStatefulSetController( - sharedInformers.Pods().Informer(), - client("statefulset-controller"), - resyncPeriod, - ).Run(1, wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { + glog.Infof("Starting ReplicaSet controller") + go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + Run(int(s.ConcurrentRSSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "batch/v2alpha1" - resources, found = resourceMap[groupVersion] - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "cronjobs") { - glog.Infof("Starting cronjob controller") - // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset - kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go cronjob.NewCronJobController(client("cronjob-controller")). - Run(wait.NeverStop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - } else { - glog.Infof("Not starting %s apis", groupVersion) + if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { + glog.Infof("Starting horizontal pod autoscaler controller.") + hpaClient := clientBuilder.ClientOrDie("horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) + go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). + Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { + glog.Infof("Starting disruption controller") + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("disruption-controller")).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { + glog.Infof("Starting StatefulSet controller") + resyncPeriod := ResyncPeriod(s)() + go petset.NewStatefulSetController( + sharedInformers.Pods().Informer(), + clientBuilder.ClientOrDie("statefulset-controller"), + resyncPeriod, + ).Run(1, stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { + glog.Infof("Starting cronjob controller") + // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset + cronjobConfig := rootClientBuilder.ConfigOrDie("cronjob-controller") + cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} + go cronjob.NewCronJobController(clientset.NewForConfigOrDie(cronjobConfig)).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration) if err != nil { - glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) + return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } params := persistentvolumecontroller.ControllerParameters{ - KubeClient: client("persistent-volume-binder"), + KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"), SyncPeriod: s.PVClaimBinderSyncPeriod.Duration, AlphaProvisioner: alphaProvisioner, VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), @@ -515,64 +502,57 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) - volumeController.Run(wait.NeverStop) + volumeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( - client("attachdetach-controller"), + clientBuilder.ClientOrDie("attachdetach-controller"), sharedInformers.Pods().Informer(), sharedInformers.Nodes().Informer(), sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumes().Informer(), cloud, - ProbeAttachableVolumePlugins(s.VolumeConfiguration), - recorder) + ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { - glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) + return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) } - go attachDetachController.Run(wait.NeverStop) + go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion = "certificates.k8s.io/v1alpha1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start certificates, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "certificatesigningrequests") { - glog.Infof("Starting certificate request controller") - resyncPeriod := ResyncPeriod(s)() - certController, err := certcontroller.NewCertificateController( - client("certificate-controller"), - resyncPeriod, - s.ClusterSigningCertFile, - s.ClusterSigningKeyFile, - s.ApproveAllKubeletCSRsForGroup, - ) - if err != nil { - glog.Errorf("Failed to start certificate controller: %v", err) - } else { - go certController.Run(1, wait.NeverStop) - } - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + if availableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] { + glog.Infof("Starting certificate request controller") + resyncPeriod := ResyncPeriod(s)() + certController, err := certcontroller.NewCertificateController( + clientBuilder.ClientOrDie("certificate-controller"), + resyncPeriod, + s.ClusterSigningCertFile, + s.ClusterSigningKeyFile, + s.ApproveAllKubeletCSRsForGroup, + ) + if err != nil { + glog.Errorf("Failed to start certificate controller: %v", err) + } else { + go certController.Run(1, stop) } + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } go serviceaccountcontroller.NewServiceAccountsController( sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(), - client("service-account-controller"), + clientBuilder.ClientOrDie("service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.EnableGarbageCollector { - gcClientset := client("generic-garbage-collector") + gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector") groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources() if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) + return fmt.Errorf("failed to get supported resources from server: %v", err) } - config := restclient.AddUserAgent(kubeconfig, "generic-garbage-collector") + config := rootClientBuilder.ConfigOrDie("generic-garbage-collector") config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig = dynamic.ContentConfig() @@ -582,7 +562,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl glog.Errorf("Failed to start the generic garbage collector: %v", err) } else { workers := int(s.ConcurrentGCSyncs) - go garbageCollector.Run(workers, wait.NeverStop) + go garbageCollector.Run(workers, stop) } } @@ -590,22 +570,3 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl select {} } - -func containsVersion(versions *unversioned.APIVersions, version string) bool { - for ix := range versions.Versions { - if versions.Versions[ix] == version { - return true - } - } - return false -} - -func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { - for ix := range resources.APIResources { - resource := resources.APIResources[ix] - if resource.Name == resourceName { - return true - } - } - return false -} diff --git a/pkg/controller/client_builder.go b/pkg/controller/client_builder.go index f37de8499bd..3913a1bd3c1 100644 --- a/pkg/controller/client_builder.go +++ b/pkg/controller/client_builder.go @@ -38,6 +38,7 @@ import ( // ControllerClientBuilder allow syou to get clients and configs for controllers type ControllerClientBuilder interface { Config(name string) (*restclient.Config, error) + ConfigOrDie(name string) *restclient.Config Client(name string) (clientset.Interface, error) ClientOrDie(name string) clientset.Interface } @@ -50,7 +51,15 @@ type SimpleControllerClientBuilder struct { func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) { clientConfig := *b.ClientConfig - return &clientConfig, nil + return restclient.AddUserAgent(&clientConfig, name), nil +} + +func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig } func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, error) { @@ -58,7 +67,7 @@ func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, if err != nil { return nil, err } - return clientset.NewForConfig(restclient.AddUserAgent(clientConfig, name)) + return clientset.NewForConfig(clientConfig) } func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface { @@ -150,6 +159,14 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro return clientConfig, nil } +func (b SAControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig +} + func (b SAControllerClientBuilder) Client(name string) (clientset.Interface, error) { clientConfig, err := b.Config(name) if err != nil { diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index bcc22df9d49..0cfcfef9f38 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/release_1_5:go_default_library", + "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", @@ -42,7 +43,6 @@ go_test( library = "go_default_library", tags = ["automanaged"], deps = [ - "//pkg/client/record:go_default_library", "//pkg/controller/informers:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", ], diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index fb3c40c2d14..49fc16a7a2e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -76,8 +77,7 @@ func NewAttachDetachController( pvcInformer kcache.SharedInformer, pvInformer kcache.SharedInformer, cloud cloudprovider.Interface, - plugins []volume.VolumePlugin, - recorder record.EventRecorder) (AttachDetachController, error) { + plugins []volume.VolumePlugin) (AttachDetachController, error) { // TODO: The default resyncPeriod for shared informers is 12 hours, this is // unacceptable for the attach/detach controller. For example, if a pod is // skipped because the node it is scheduled to didn't set its annotation in @@ -115,6 +115,11 @@ func NewAttachDetachController( return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "attachdetach"}) + adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) adc.attacherDetacher = diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 2849237e37e..c301acd2b66 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/informers" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" ) @@ -33,7 +32,6 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) - fakeRecorder := &record.FakeRecorder{} // Act _, err := NewAttachDetachController( @@ -43,8 +41,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { pvcInformer, pvInformer, nil, /* cloud */ - nil, /* plugins */ - fakeRecorder) + nil /* plugins */) // Assert if err != nil {