diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 1dc8f977b1d..f1405b5603f 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -20,7 +20,6 @@ go_library( deps = [ "//cmd/kube-controller-manager/app/options:go_default_library", "//pkg/api:go_default_library", - "//pkg/api/unversioned:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch:go_default_library", @@ -31,6 +30,7 @@ go_library( "//pkg/client/leaderelection/resourcelock:go_default_library", "//pkg/client/record:go_default_library", "//pkg/client/restclient:go_default_library", + "//pkg/client/typed/discovery:go_default_library", "//pkg/client/typed/dynamic:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/cloudprovider:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index e6b6d049db9..36062581c96 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" @@ -43,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" @@ -222,11 +222,50 @@ func Run(s *options.CMServer) error { panic("unreachable") } +// TODO: In general, any controller checking this needs to be dynamic so +// users don't have to restart their controller manager if they change the apiserver. +func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) { + var discoveryClient discovery.DiscoveryInterface + + // If apiserver is not running we should wait for some time and fail only then. This is particularly + // important when we start apiserver and controller manager at the same time. + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + client, err := clientBuilder.Client("controller-discovery") + if err != nil { + glog.Errorf("Failed to get api versions from server: %v", err) + return false, nil + } + + discoveryClient = client.Discovery() + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("Failed to get api versions from server: %v", err) + } + + resourceMap, err := discoveryClient.ServerResources() + if err != nil { + return nil, fmt.Errorf("Failed to get supported resources from server: %v", err) + } + + allResources := map[schema.GroupVersionResource]bool{} + for _, apiResourceList := range resourceMap { + version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) + if err != nil { + return nil, err + } + for _, apiResource := range apiResourceList.APIResources { + allResources[version.WithResource(apiResource.Name)] = true + } + } + + return allResources, nil +} + func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } - discoveryClient := client("controller-discovery").Discovery() sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)()) // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest @@ -259,6 +298,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } } + availableResources, err := getAvailableResources(clientBuilder) + if err != nil { + return err + } + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). Run(int(s.ConcurrentEndpointSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -345,26 +389,6 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - // If apiserver is not running we should wait for some time and fail only then. This is particularly - // important when we start apiserver and controller manager at the same time. - var versionStrings []string - err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { - if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil { - return true, nil - } - glog.Errorf("Failed to get api versions from server: %v", err) - return false, nil - }) - if err != nil { - glog.Fatalf("Failed to get api versions from server: %v", err) - } - versions := &unversioned.APIVersions{Versions: versionStrings} - - resourceMap, err := discoveryClient.ServerResources() - if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) - } - // TODO: should use a dynamic RESTMapper built from the discovery results. restMapper := registered.RESTMapper() @@ -398,107 +422,72 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion := "extensions/v1beta1" - resources, found := resourceMap[groupVersion] - // TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - - if containsResource(resources, "daemonsets") { - glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "jobs") { - glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). - Run(int(s.ConcurrentJobSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "deployments") { - glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if containsResource(resources, "replicasets") { - glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). + Run(int(s.ConcurrentDaemonSetSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "autoscaling/v1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start horizontal pod autoscaler controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "horizontalpodautoscalers") { - glog.Infof("Starting horizontal pod controller.") - hpaClient := client("horizontal-pod-autoscaler") - metricsClient := metrics.NewHeapsterMetricsClient( - hpaClient, - metrics.DefaultHeapsterNamespace, - metrics.DefaultHeapsterScheme, - metrics.DefaultHeapsterService, - metrics.DefaultHeapsterPort, - ) - replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { + glog.Infof("Starting job controller") + go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). + Run(int(s.ConcurrentJobSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "policy/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "poddisruptionbudgets") { - glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { + glog.Infof("Starting deployment controller") + go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). + Run(int(s.ConcurrentDeploymentSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "apps/v1beta1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start statefulset, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "statefulsets") { - glog.Infof("Starting StatefulSet controller") - resyncPeriod := ResyncPeriod(s)() - go petset.NewStatefulSetController( - sharedInformers.Pods().Informer(), - client("statefulset-controller"), - resyncPeriod, - ).Run(1, stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } + if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { + glog.Infof("Starting ReplicaSet controller") + go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + Run(int(s.ConcurrentRSSyncs), stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } - groupVersion = "batch/v2alpha1" - resources, found = resourceMap[groupVersion] - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "cronjobs") { - glog.Infof("Starting cronjob controller") - // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset - kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go cronjob.NewCronJobController(client("cronjob-controller")). - Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - } else { - glog.Infof("Not starting %s apis", groupVersion) + if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { + glog.Infof("Starting horizontal pod autoscaler controller.") + hpaClient := client("horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) + go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). + Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { + glog.Infof("Starting disruption controller") + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { + glog.Infof("Starting StatefulSet controller") + resyncPeriod := ResyncPeriod(s)() + go petset.NewStatefulSetController( + sharedInformers.Pods().Informer(), + client("statefulset-controller"), + resyncPeriod, + ).Run(1, stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + + if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { + glog.Infof("Starting cronjob controller") + // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset + kubeconfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} + go cronjob.NewCronJobController(client("cronjob-controller")).Run(stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration) @@ -533,28 +522,22 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - groupVersion = "certificates.k8s.io/v1alpha1" - resources, found = resourceMap[groupVersion] - glog.Infof("Attempting to start certificates, full resource map %+v", resourceMap) - if containsVersion(versions, groupVersion) && found { - glog.Infof("Starting %s apis", groupVersion) - if containsResource(resources, "certificatesigningrequests") { - glog.Infof("Starting certificate request controller") - resyncPeriod := ResyncPeriod(s)() - certController, err := certcontroller.NewCertificateController( - client("certificate-controller"), - resyncPeriod, - s.ClusterSigningCertFile, - s.ClusterSigningKeyFile, - s.ApproveAllKubeletCSRsForGroup, - ) - if err != nil { - glog.Errorf("Failed to start certificate controller: %v", err) - } else { - go certController.Run(1, stop) - } - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + if availableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] { + glog.Infof("Starting certificate request controller") + resyncPeriod := ResyncPeriod(s)() + certController, err := certcontroller.NewCertificateController( + client("certificate-controller"), + resyncPeriod, + s.ClusterSigningCertFile, + s.ClusterSigningKeyFile, + s.ApproveAllKubeletCSRsForGroup, + ) + if err != nil { + glog.Errorf("Failed to start certificate controller: %v", err) + } else { + go certController.Run(1, stop) } + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } go serviceaccountcontroller.NewServiceAccountsController( @@ -589,22 +572,3 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl select {} } - -func containsVersion(versions *unversioned.APIVersions, version string) bool { - for ix := range versions.Versions { - if versions.Versions[ix] == version { - return true - } - } - return false -} - -func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { - for ix := range resources.APIResources { - resource := resources.APIResources[ix] - if resource.Name == resourceName { - return true - } - } - return false -}