separate controller initialization for easy controllers

This commit is contained in:
deads2k 2016-12-05 09:04:32 -05:00
parent 6c4033eea7
commit f36a5ae9a1
9 changed files with 498 additions and 239 deletions

View File

@ -13,8 +13,15 @@ load(
go_library(
name = "go_default_library",
srcs = [
"apps.go",
"autoscaling.go",
"batch.go",
"certificates.go",
"controllermanager.go",
"core.go",
"extensions.go",
"plugins.go",
"policy.go",
],
tags = ["automanaged"],
deps = [

View File

@ -0,0 +1,39 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
petset "k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startStatefulSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] {
return false, nil
}
resyncPeriod := ResyncPeriod(&ctx.Options)()
go petset.NewStatefulSetController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
resyncPeriod,
).Run(1, ctx.Stop)
return true, nil
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startHPAController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return false, nil
}
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewHeapsterMetricsClient(
hpaClient,
metrics.DefaultHeapsterNamespace,
metrics.DefaultHeapsterScheme,
metrics.DefaultHeapsterService,
metrics.DefaultHeapsterPort,
)
replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core())
go podautoscaler.NewHorizontalController(
hpaClient.Core(),
hpaClient.Extensions(),
hpaClient.Autoscaling(),
replicaCalc,
ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration,
).Run(ctx.Stop)
return true, nil
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startCronJobController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] {
return false, nil
}
// TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
cronjobConfig := ctx.ClientBuilder.ConfigOrDie("cronjob-controller")
cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
go cronjob.NewCronJobController(
clientset.NewForConfigOrDie(cronjobConfig),
).Run(ctx.Stop)
return true, nil
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"github.com/golang/glog"
certcontroller "k8s.io/kubernetes/pkg/controller/certificates"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startCSRController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] {
return false, nil
}
resyncPeriod := ResyncPeriod(&ctx.Options)()
c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
certController, err := certcontroller.NewCertificateController(
c,
resyncPeriod,
ctx.Options.ClusterSigningCertFile,
ctx.Options.ClusterSigningKeyFile,
certcontroller.NewGroupApprover(c.Certificates().CertificateSigningRequests(), ctx.Options.ApproveAllKubeletCSRsForGroup),
)
if err != nil {
// TODO this is failing consistently in test-cmd and local-up-cluster.sh. Fix them and make it consistent with all others which
// cause a crash loop
glog.Errorf("Failed to start certificate controller: %v", err)
return false, nil
}
go certController.Run(1, ctx.Stop)
return true, nil
}

View File

@ -32,12 +32,7 @@ import (
"time"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/leaderelection"
@ -45,38 +40,18 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
certcontroller "k8s.io/kubernetes/pkg/controller/certificates"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/disruption"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
petset "k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/podgc"
replicaset "k8s.io/kubernetes/pkg/controller/replicaset"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/serviceaccount"
certutil "k8s.io/kubernetes/pkg/util/cert"
"k8s.io/kubernetes/pkg/util/configz"
@ -243,108 +218,31 @@ type ControllerContext struct {
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (bool, error)
func newControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startEndpointController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefuleset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["certificatesigningrequests"] = startCSRController
return controllers
}
func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
}
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(&ctx.Options),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
return true, nil
}
func startPodGCController(ctx ControllerContext) (bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Pods().Informer(),
int(ctx.Options.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return true, nil
}
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(
resourceQuotaControllerOptions,
).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
return true, nil
}
func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return true, fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return true, fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return true, fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil
}
// TODO: In general, any controller checking this needs to be dynamic so
// users don't have to restart their controller manager if they change the apiserver.
func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) {
@ -493,75 +391,6 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] {
glog.Infof("Starting job controller")
go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), clientBuilder.ClientOrDie("job-controller")).
Run(int(s.ConcurrentJobSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("deployment-controller")).
Run(int(s.ConcurrentDeploymentSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
glog.Infof("Starting horizontal pod autoscaler controller.")
hpaClient := clientBuilder.ClientOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewHeapsterMetricsClient(
hpaClient,
metrics.DefaultHeapsterNamespace,
metrics.DefaultHeapsterScheme,
metrics.DefaultHeapsterService,
metrics.DefaultHeapsterPort,
)
replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core())
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration).
Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] {
glog.Infof("Starting disruption controller")
go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("disruption-controller")).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] {
glog.Infof("Starting StatefulSet controller")
resyncPeriod := ResyncPeriod(s)()
go petset.NewStatefulSetController(
sharedInformers.Pods().Informer(),
clientBuilder.ClientOrDie("statefulset-controller"),
resyncPeriod,
).Run(1, stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
if availableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] {
glog.Infof("Starting cronjob controller")
// TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
cronjobConfig := rootClientBuilder.ConfigOrDie("cronjob-controller")
cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
go cronjob.NewCronJobController(clientset.NewForConfigOrDie(cronjobConfig)).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
@ -594,61 +423,6 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
go attachDetachController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if availableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1alpha1", Resource: "certificatesigningrequests"}] {
glog.Infof("Starting certificate request controller")
resyncPeriod := ResyncPeriod(s)()
c := clientBuilder.ClientOrDie("certificate-controller")
certController, err := certcontroller.NewCertificateController(
c,
resyncPeriod,
s.ClusterSigningCertFile,
s.ClusterSigningKeyFile,
certcontroller.NewGroupApprover(c.Certificates().CertificateSigningRequests(), s.ApproveAllKubeletCSRsForGroup),
)
if err != nil {
glog.Errorf("Failed to start certificate controller: %v", err)
} else {
go certController.Run(1, stop)
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
go serviceaccountcontroller.NewServiceAccountsController(
sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(),
clientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run(1, stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.EnableGarbageCollector {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector")
preferredResources, err := gcClientset.Discovery().ServerPreferredResources()
if err != nil {
return fmt.Errorf("failed to get supported resources from server: %v", err)
}
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
glog.Errorf("Failed to parse resources from server: %v", err)
}
config := rootClientBuilder.ConfigOrDie("generic-garbage-collector")
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig = dynamic.ContentConfig()
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources)
if err != nil {
glog.Errorf("Failed to start the generic garbage collector: %v", err)
} else {
workers := int(s.ConcurrentGCSyncs)
go garbageCollector.Run(workers, stop)
}
}
sharedInformers.Start(stop)
select {}

View File

@ -0,0 +1,177 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/runtime/serializer"
)
func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
}
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(&ctx.Options),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
return true, nil
}
func startPodGCController(ctx ControllerContext) (bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Pods().Informer(),
int(ctx.Options.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return true, nil
}
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(
resourceQuotaControllerOptions,
).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
return true, nil
}
func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return true, fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return true, fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return true, fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil
}
func startServiceAccountController(ctx ControllerContext) (bool, error) {
go serviceaccountcontroller.NewServiceAccountsController(
ctx.InformerFactory.ServiceAccounts(),
ctx.InformerFactory.Namespaces(),
ctx.ClientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run(1, ctx.Stop)
return true, nil
}
func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
if !ctx.Options.EnableGarbageCollector {
return false, nil
}
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
preferredResources, err := gcClientset.Discovery().ServerPreferredResources()
if err != nil {
return true, fmt.Errorf("failed to get supported resources from server: %v", err)
}
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
return true, fmt.Errorf("Failed to parse resources from server: %v", err)
}
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig = dynamic.ContentConfig()
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources)
if err != nil {
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
}
workers := int(ctx.Options.ConcurrentGCSyncs)
go garbageCollector.Run(workers, ctx.Stop)
return true, nil
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/job"
replicaset "k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startDaemonSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
return false, nil
}
go daemon.NewDaemonSetsController(
ctx.InformerFactory.DaemonSets(),
ctx.InformerFactory.Pods(),
ctx.InformerFactory.Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
int(ctx.Options.LookupCacheSizeForDaemonSet),
).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
return true, nil
}
func startJobController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] {
return false, nil
}
go job.NewJobController(
ctx.InformerFactory.Pods().Informer(),
ctx.InformerFactory.Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.Options.ConcurrentJobSyncs), ctx.Stop)
return true, nil
}
func startDeploymentController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] {
return false, nil
}
go deployment.NewDeploymentController(
ctx.InformerFactory.Deployments(),
ctx.InformerFactory.ReplicaSets(),
ctx.InformerFactory.Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil
}
func startReplicaSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] {
return false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.ReplicaSets(),
ctx.InformerFactory.Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRS),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop)
return true, nil
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func startDisruptionController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] {
return false, nil
}
go disruption.NewDisruptionController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop)
return true, nil
}