diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 3de42bcd91e..2d37111c402 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -13,8 +13,15 @@ load( go_library( name = "go_default_library", srcs = [ + "apps.go", + "autoscaling.go", + "batch.go", + "certificates.go", "controllermanager.go", + "core.go", + "extensions.go", "plugins.go", + "policy.go", ], tags = ["automanaged"], deps = [ diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go new file mode 100644 index 00000000000..74f7bdfcbc8 --- /dev/null +++ b/cmd/kube-controller-manager/app/apps.go @@ -0,0 +1,39 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + petset "k8s.io/kubernetes/pkg/controller/petset" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startStatefulSetController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { + return false, nil + } + resyncPeriod := ResyncPeriod(&ctx.Options)() + go petset.NewStatefulSetController( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("statefulset-controller"), + resyncPeriod, + ).Run(1, ctx.Stop) + return true, nil +} diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go new file mode 100644 index 00000000000..6fd279add49 --- /dev/null +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -0,0 +1,50 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "k8s.io/kubernetes/pkg/controller/podautoscaler" + "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startHPAController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { + return false, nil + } + hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) + go podautoscaler.NewHorizontalController( + hpaClient.Core(), + hpaClient.Extensions(), + hpaClient.Autoscaling(), + replicaCalc, + ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration, + ).Run(ctx.Stop) + return true, nil +} diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go new file mode 100644 index 00000000000..d41fc6debf6 --- /dev/null +++ b/cmd/kube-controller-manager/app/batch.go @@ -0,0 +1,41 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "k8s.io/kubernetes/pkg/apis/batch" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + "k8s.io/kubernetes/pkg/controller/cronjob" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startCronJobController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { + return false, nil + } + // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset + cronjobConfig := ctx.ClientBuilder.ConfigOrDie("cronjob-controller") + cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} + go cronjob.NewCronJobController( + clientset.NewForConfigOrDie(cronjobConfig), + ).Run(ctx.Stop) + return true, nil +} diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go new file mode 100644 index 00000000000..c0588b872cc --- /dev/null +++ b/cmd/kube-controller-manager/app/certificates.go @@ -0,0 +1,51 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "github.com/golang/glog" + + certcontroller "k8s.io/kubernetes/pkg/controller/certificates" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startCSRController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] { + return false, nil + } + resyncPeriod := ResyncPeriod(&ctx.Options)() + c := ctx.ClientBuilder.ClientOrDie("certificate-controller") + certController, err := certcontroller.NewCertificateController( + c, + resyncPeriod, + ctx.Options.ClusterSigningCertFile, + ctx.Options.ClusterSigningKeyFile, + certcontroller.NewGroupApprover(c.Certificates().CertificateSigningRequests(), ctx.Options.ApproveAllKubeletCSRsForGroup), + ) + if err != nil { + // TODO this is failing consistently in test-cmd and local-up-cluster.sh. Fix them and make it consistent with all others which + // cause a crash loop + glog.Errorf("Failed to start certificate controller: %v", err) + return false, nil + } + go certController.Run(1, ctx.Stop) + return true, nil +} diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index d8e1fe3e9fa..7a8a5acc39c 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -32,12 +32,7 @@ import ( "time" "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/apimachinery/registered" - "k8s.io/kubernetes/pkg/apis/batch" - "k8s.io/kubernetes/pkg/apis/extensions" - metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/leaderelection" @@ -45,38 +40,18 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/typed/discovery" - "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" - certcontroller "k8s.io/kubernetes/pkg/controller/certificates" - "k8s.io/kubernetes/pkg/controller/cronjob" - "k8s.io/kubernetes/pkg/controller/daemon" - "k8s.io/kubernetes/pkg/controller/deployment" - "k8s.io/kubernetes/pkg/controller/disruption" - endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" - "k8s.io/kubernetes/pkg/controller/garbagecollector" - "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/pkg/controller/informers" - "k8s.io/kubernetes/pkg/controller/job" - namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" - petset "k8s.io/kubernetes/pkg/controller/petset" - "k8s.io/kubernetes/pkg/controller/podautoscaler" - "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" - "k8s.io/kubernetes/pkg/controller/podgc" - replicaset "k8s.io/kubernetes/pkg/controller/replicaset" - replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" - resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" routecontroller "k8s.io/kubernetes/pkg/controller/route" servicecontroller "k8s.io/kubernetes/pkg/controller/service" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/healthz" - quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/runtime/schema" - "k8s.io/kubernetes/pkg/runtime/serializer" "k8s.io/kubernetes/pkg/serviceaccount" certutil "k8s.io/kubernetes/pkg/util/cert" "k8s.io/kubernetes/pkg/util/configz" @@ -243,108 +218,31 @@ type ControllerContext struct { // InitFunc is used to launch a particular controller. It may run additional "should I activate checks". // Any error returned will cause the controller process to `Fatal` +// The bool indicates whether the controller was enabled. type InitFunc func(ctx ControllerContext) (bool, error) func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController - controllers["podgc"] = startEndpointController + controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController + controllers["serviceaccount"] = startServiceAccountController + controllers["garbagecollector"] = startGarbageCollectorController + controllers["daemonset"] = startDaemonSetController + controllers["job"] = startJobController + controllers["deployment"] = startDeploymentController + controllers["replicaset"] = startReplicaSetController + controllers["horizontalpodautoscaling"] = startHPAController + controllers["disruption"] = startDisruptionController + controllers["statefuleset"] = startStatefulSetController + controllers["cronjob"] = startCronJobController + controllers["certificatesigningrequests"] = startCSRController return controllers } -func startEndpointController(ctx ControllerContext) (bool, error) { - go endpointcontroller.NewEndpointController( - ctx.InformerFactory.Pods().Informer(), - ctx.ClientBuilder.ClientOrDie("endpoint-controller"), - ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) - return true, nil -} - -func startReplicationController(ctx ControllerContext) (bool, error) { - go replicationcontroller.NewReplicationManager( - ctx.InformerFactory.Pods().Informer(), - ctx.ClientBuilder.ClientOrDie("replication-controller"), - ResyncPeriod(&ctx.Options), - replicationcontroller.BurstReplicas, - int(ctx.Options.LookupCacheSizeForRC), - ctx.Options.EnableGarbageCollector, - ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) - return true, nil -} - -func startPodGCController(ctx ControllerContext) (bool, error) { - go podgc.NewPodGC( - ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), - ctx.InformerFactory.Pods().Informer(), - int(ctx.Options.TerminatedPodGCThreshold), - ).Run(ctx.Stop) - return true, nil -} - -func startResourceQuotaController(ctx ControllerContext) (bool, error) { - resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) - groupKindsToReplenish := []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - api.Kind("Secret"), - api.Kind("ConfigMap"), - } - resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ - KubeClient: resourceQuotaControllerClient, - ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration), - Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient), - ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options), - GroupKindsToReplenish: groupKindsToReplenish, - } - go resourcequotacontroller.NewResourceQuotaController( - resourceQuotaControllerOptions, - ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) - - return true, nil -} - -func startNamespaceController(ctx ControllerContext) (bool, error) { - // TODO: should use a dynamic RESTMapper built from the discovery results. - restMapper := registered.RESTMapper() - - // Find the list of namespaced resources via discovery that the namespace controller must manage - namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller") - namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) - // TODO: consider using a list-watch + cache here rather than polling - resources, err := namespaceKubeClient.Discovery().ServerResources() - if err != nil { - return true, fmt.Errorf("failed to get preferred server resources: %v", err) - } - gvrs, err := discovery.GroupVersionResources(resources) - if err != nil { - return true, fmt.Errorf("failed to parse preferred server resources: %v", err) - } - discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources - if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found { - // make discovery static - snapshot, err := discoverResourcesFn() - if err != nil { - return true, fmt.Errorf("failed to get server resources: %v", err) - } - discoverResourcesFn = func() ([]*metav1.APIResourceList, error) { - return snapshot, nil - } - } - namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) - go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop) - - return true, nil - -} - // TODO: In general, any controller checking this needs to be dynamic so // users don't have to restart their controller manager if they change the apiserver. func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) { @@ -493,75 +391,6 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) } - if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { - go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). - Run(int(s.ConcurrentDaemonSetSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { - glog.Infof("Starting job controller") - go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), clientBuilder.ClientOrDie("job-controller")). - Run(int(s.ConcurrentJobSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { - glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("deployment-controller")). - Run(int(s.ConcurrentDeploymentSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { - glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). - Run(int(s.ConcurrentRSSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { - glog.Infof("Starting horizontal pod autoscaler controller.") - hpaClient := clientBuilder.ClientOrDie("horizontal-pod-autoscaler") - metricsClient := metrics.NewHeapsterMetricsClient( - hpaClient, - metrics.DefaultHeapsterNamespace, - metrics.DefaultHeapsterScheme, - metrics.DefaultHeapsterService, - metrics.DefaultHeapsterPort, - ) - replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). - Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { - glog.Infof("Starting disruption controller") - go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("disruption-controller")).Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { - glog.Infof("Starting StatefulSet controller") - resyncPeriod := ResyncPeriod(s)() - go petset.NewStatefulSetController( - sharedInformers.Pods().Informer(), - clientBuilder.ClientOrDie("statefulset-controller"), - resyncPeriod, - ).Run(1, stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] { - glog.Infof("Starting cronjob controller") - // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset - cronjobConfig := rootClientBuilder.ConfigOrDie("cronjob-controller") - cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} - go cronjob.NewCronJobController(clientset.NewForConfigOrDie(cronjobConfig)).Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration) if err != nil { return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) @@ -594,61 +423,6 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root go attachDetachController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - if availableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] { - glog.Infof("Starting certificate request controller") - resyncPeriod := ResyncPeriod(s)() - c := clientBuilder.ClientOrDie("certificate-controller") - certController, err := certcontroller.NewCertificateController( - c, - resyncPeriod, - s.ClusterSigningCertFile, - s.ClusterSigningKeyFile, - certcontroller.NewGroupApprover(c.Certificates().CertificateSigningRequests(), s.ApproveAllKubeletCSRsForGroup), - ) - if err != nil { - glog.Errorf("Failed to start certificate controller: %v", err) - } else { - go certController.Run(1, stop) - } - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - go serviceaccountcontroller.NewServiceAccountsController( - sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(), - clientBuilder.ClientOrDie("service-account-controller"), - serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), - ).Run(1, stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - - if s.EnableGarbageCollector { - // TODO: should use a dynamic RESTMapper built from the discovery results. - restMapper := registered.RESTMapper() - - gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector") - preferredResources, err := gcClientset.Discovery().ServerPreferredResources() - if err != nil { - return fmt.Errorf("failed to get supported resources from server: %v", err) - } - deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) - deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) - if err != nil { - glog.Errorf("Failed to parse resources from server: %v", err) - } - - config := rootClientBuilder.ConfigOrDie("generic-garbage-collector") - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) - config.ContentConfig = dynamic.ContentConfig() - clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) - garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources) - if err != nil { - glog.Errorf("Failed to start the generic garbage collector: %v", err) - } else { - workers := int(s.ConcurrentGCSyncs) - go garbageCollector.Run(workers, stop) - } - } - sharedInformers.Start(stop) select {} diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go new file mode 100644 index 00000000000..5257a126675 --- /dev/null +++ b/cmd/kube-controller-manager/app/core.go @@ -0,0 +1,177 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/apis/extensions" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/client/typed/discovery" + "k8s.io/kubernetes/pkg/client/typed/dynamic" + "k8s.io/kubernetes/pkg/controller" + endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" + namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" + "k8s.io/kubernetes/pkg/controller/podgc" + replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" + resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" + serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + quotainstall "k8s.io/kubernetes/pkg/quota/install" + "k8s.io/kubernetes/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/runtime/serializer" +) + +func startEndpointController(ctx ControllerContext) (bool, error) { + go endpointcontroller.NewEndpointController( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("endpoint-controller"), + ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) + return true, nil +} + +func startReplicationController(ctx ControllerContext) (bool, error) { + go replicationcontroller.NewReplicationManager( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("replication-controller"), + ResyncPeriod(&ctx.Options), + replicationcontroller.BurstReplicas, + int(ctx.Options.LookupCacheSizeForRC), + ctx.Options.EnableGarbageCollector, + ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) + return true, nil +} + +func startPodGCController(ctx ControllerContext) (bool, error) { + go podgc.NewPodGC( + ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), + ctx.InformerFactory.Pods().Informer(), + int(ctx.Options.TerminatedPodGCThreshold), + ).Run(ctx.Stop) + return true, nil +} + +func startResourceQuotaController(ctx ControllerContext) (bool, error) { + resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") + resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) + groupKindsToReplenish := []schema.GroupKind{ + api.Kind("Pod"), + api.Kind("Service"), + api.Kind("ReplicationController"), + api.Kind("PersistentVolumeClaim"), + api.Kind("Secret"), + api.Kind("ConfigMap"), + } + resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ + KubeClient: resourceQuotaControllerClient, + ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration), + Registry: resourceQuotaRegistry, + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient), + ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options), + GroupKindsToReplenish: groupKindsToReplenish, + } + go resourcequotacontroller.NewResourceQuotaController( + resourceQuotaControllerOptions, + ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) + return true, nil +} + +func startNamespaceController(ctx ControllerContext) (bool, error) { + // TODO: should use a dynamic RESTMapper built from the discovery results. + restMapper := registered.RESTMapper() + + // Find the list of namespaced resources via discovery that the namespace controller must manage + namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller") + namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) + // TODO: consider using a list-watch + cache here rather than polling + resources, err := namespaceKubeClient.Discovery().ServerResources() + if err != nil { + return true, fmt.Errorf("failed to get preferred server resources: %v", err) + } + gvrs, err := discovery.GroupVersionResources(resources) + if err != nil { + return true, fmt.Errorf("failed to parse preferred server resources: %v", err) + } + discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found { + // make discovery static + snapshot, err := discoverResourcesFn() + if err != nil { + return true, fmt.Errorf("failed to get server resources: %v", err) + } + discoverResourcesFn = func() ([]*metav1.APIResourceList, error) { + return snapshot, nil + } + } + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) + go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop) + + return true, nil + +} + +func startServiceAccountController(ctx ControllerContext) (bool, error) { + go serviceaccountcontroller.NewServiceAccountsController( + ctx.InformerFactory.ServiceAccounts(), + ctx.InformerFactory.Namespaces(), + ctx.ClientBuilder.ClientOrDie("service-account-controller"), + serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), + ).Run(1, ctx.Stop) + return true, nil +} + +func startGarbageCollectorController(ctx ControllerContext) (bool, error) { + if !ctx.Options.EnableGarbageCollector { + return false, nil + } + + // TODO: should use a dynamic RESTMapper built from the discovery results. + restMapper := registered.RESTMapper() + + gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") + preferredResources, err := gcClientset.Discovery().ServerPreferredResources() + if err != nil { + return true, fmt.Errorf("failed to get supported resources from server: %v", err) + } + deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) + deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) + if err != nil { + return true, fmt.Errorf("Failed to parse resources from server: %v", err) + } + + config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") + config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} + metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) + config.ContentConfig = dynamic.ContentConfig() + clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) + garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources) + if err != nil { + return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err) + } + workers := int(ctx.Options.ConcurrentGCSyncs) + go garbageCollector.Run(workers, ctx.Stop) + + return true, nil +} diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go new file mode 100644 index 00000000000..5cfa26540c4 --- /dev/null +++ b/cmd/kube-controller-manager/app/extensions.go @@ -0,0 +1,83 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "k8s.io/kubernetes/pkg/controller/daemon" + "k8s.io/kubernetes/pkg/controller/deployment" + "k8s.io/kubernetes/pkg/controller/job" + replicaset "k8s.io/kubernetes/pkg/controller/replicaset" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startDaemonSetController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { + return false, nil + } + go daemon.NewDaemonSetsController( + ctx.InformerFactory.DaemonSets(), + ctx.InformerFactory.Pods(), + ctx.InformerFactory.Nodes(), + ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), + int(ctx.Options.LookupCacheSizeForDaemonSet), + ).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop) + return true, nil +} + +func startJobController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { + return false, nil + } + go job.NewJobController( + ctx.InformerFactory.Pods().Informer(), + ctx.InformerFactory.Jobs(), + ctx.ClientBuilder.ClientOrDie("job-controller"), + ).Run(int(ctx.Options.ConcurrentJobSyncs), ctx.Stop) + return true, nil +} + +func startDeploymentController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { + return false, nil + } + go deployment.NewDeploymentController( + ctx.InformerFactory.Deployments(), + ctx.InformerFactory.ReplicaSets(), + ctx.InformerFactory.Pods(), + ctx.ClientBuilder.ClientOrDie("deployment-controller"), + ).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop) + return true, nil +} + +func startReplicaSetController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { + return false, nil + } + go replicaset.NewReplicaSetController( + ctx.InformerFactory.ReplicaSets(), + ctx.InformerFactory.Pods(), + ctx.ClientBuilder.ClientOrDie("replicaset-controller"), + replicaset.BurstReplicas, + int(ctx.Options.LookupCacheSizeForRS), + ctx.Options.EnableGarbageCollector, + ).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop) + return true, nil +} diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go new file mode 100644 index 00000000000..119106f643f --- /dev/null +++ b/cmd/kube-controller-manager/app/policy.go @@ -0,0 +1,37 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +package app + +import ( + "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/runtime/schema" +) + +func startDisruptionController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { + return false, nil + } + go disruption.NewDisruptionController( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("disruption-controller"), + ).Run(ctx.Stop) + return true, nil +}