use the client builder to support using SAs

This commit is contained in:
deads2k 2016-11-21 15:13:22 -05:00
parent 21c304333a
commit 585daa2069
2 changed files with 22 additions and 25 deletions

View File

@ -263,10 +263,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
} }
func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
client := func(serviceAccountName string) clientset.Interface { sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)())
return rootClientBuilder.ClientOrDie(serviceAccountName)
}
sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
if len(s.ServiceAccountKeyFile) > 0 { if len(s.ServiceAccountKeyFile) > 0 {
@ -303,7 +300,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
return err return err
} }
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")). go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")).
Run(int(s.ConcurrentEndpointSyncs), stop) Run(int(s.ConcurrentEndpointSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
@ -317,7 +314,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
).Run(int(s.ConcurrentRCSyncs), stop) ).Run(int(s.ConcurrentRCSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go podgc.NewPodGC(client("pod-garbage-collector"), sharedInformers.Pods().Informer(), go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(),
int(s.TerminatedPodGCThreshold)).Run(stop) int(s.TerminatedPodGCThreshold)).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
@ -336,7 +333,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
} }
nodeController, err := nodecontroller.NewNodeController( nodeController, err := nodecontroller.NewNodeController(
sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), sharedInformers.DaemonSets(),
cloud, client("node-controller"), cloud, clientBuilder.ClientOrDie("node-controller"),
s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration, s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
@ -346,7 +343,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
nodeController.Run() nodeController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName)
if err != nil { if err != nil {
glog.Errorf("Failed to start service controller: %v", err) glog.Errorf("Failed to start service controller: %v", err)
} else { } else {
@ -360,7 +357,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
} else if routes, ok := cloud.Routes(); !ok { } else if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else { } else {
routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), s.ClusterName, clusterCIDR)
routeController.Run(s.RouteReconciliationPeriod.Duration) routeController.Run(s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -368,7 +365,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
} }
resourceQuotaControllerClient := client("resourcequota-controller") resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers) resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []schema.GroupKind{ groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
@ -393,7 +390,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
restMapper := registered.RESTMapper() restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage // Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := client("namespace-controller") namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling // TODO: consider using a list-watch + cache here rather than polling
var gvrFn func() ([]schema.GroupVersionResource, error) var gvrFn func() ([]schema.GroupVersionResource, error)
@ -423,35 +420,35 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), stop) Run(int(s.ConcurrentDaemonSetSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] { if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"}] {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), client("job-controller")). go job.NewJobController(sharedInformers.Pods().Informer(), sharedInformers.Jobs(), clientBuilder.ClientOrDie("job-controller")).
Run(int(s.ConcurrentJobSyncs), stop) Run(int(s.ConcurrentJobSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] { if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] {
glog.Infof("Starting deployment controller") glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("deployment-controller")).
Run(int(s.ConcurrentDeploymentSyncs), stop) Run(int(s.ConcurrentDeploymentSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] { if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] {
glog.Infof("Starting ReplicaSet controller") glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), clientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), stop) Run(int(s.ConcurrentRSSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { if availableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
glog.Infof("Starting horizontal pod autoscaler controller.") glog.Infof("Starting horizontal pod autoscaler controller.")
hpaClient := client("horizontal-pod-autoscaler") hpaClient := clientBuilder.ClientOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewHeapsterMetricsClient( metricsClient := metrics.NewHeapsterMetricsClient(
hpaClient, hpaClient,
metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterNamespace,
@ -467,7 +464,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] { if availableResources[schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}] {
glog.Infof("Starting disruption controller") glog.Infof("Starting disruption controller")
go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(stop) go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("disruption-controller")).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -476,7 +473,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
resyncPeriod := ResyncPeriod(s)() resyncPeriod := ResyncPeriod(s)()
go petset.NewStatefulSetController( go petset.NewStatefulSetController(
sharedInformers.Pods().Informer(), sharedInformers.Pods().Informer(),
client("statefulset-controller"), clientBuilder.ClientOrDie("statefulset-controller"),
resyncPeriod, resyncPeriod,
).Run(1, stop) ).Run(1, stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
@ -496,7 +493,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) return fmt.Errorf("an backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
} }
params := persistentvolumecontroller.ControllerParameters{ params := persistentvolumecontroller.ControllerParameters{
KubeClient: client("persistent-volume-binder"), KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: s.PVClaimBinderSyncPeriod.Duration, SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
AlphaProvisioner: alphaProvisioner, AlphaProvisioner: alphaProvisioner,
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
@ -510,7 +507,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
attachDetachController, attachDetachControllerErr := attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController( attachdetach.NewAttachDetachController(
client("attachdetach-controller"), clientBuilder.ClientOrDie("attachdetach-controller"),
sharedInformers.Pods().Informer(), sharedInformers.Pods().Informer(),
sharedInformers.Nodes().Informer(), sharedInformers.Nodes().Informer(),
sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumeClaims().Informer(),
@ -527,7 +524,7 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
glog.Infof("Starting certificate request controller") glog.Infof("Starting certificate request controller")
resyncPeriod := ResyncPeriod(s)() resyncPeriod := ResyncPeriod(s)()
certController, err := certcontroller.NewCertificateController( certController, err := certcontroller.NewCertificateController(
client("certificate-controller"), clientBuilder.ClientOrDie("certificate-controller"),
resyncPeriod, resyncPeriod,
s.ClusterSigningCertFile, s.ClusterSigningCertFile,
s.ClusterSigningKeyFile, s.ClusterSigningKeyFile,
@ -543,13 +540,13 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
go serviceaccountcontroller.NewServiceAccountsController( go serviceaccountcontroller.NewServiceAccountsController(
sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(), sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(),
client("service-account-controller"), clientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run(1, stop) ).Run(1, stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.EnableGarbageCollector { if s.EnableGarbageCollector {
gcClientset := client("generic-garbage-collector") gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector")
groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources() groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources()
if err != nil { if err != nil {
return fmt.Errorf("failed to get supported resources from server: %v", err) return fmt.Errorf("failed to get supported resources from server: %v", err)

View File

@ -51,7 +51,7 @@ type SimpleControllerClientBuilder struct {
func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) { func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) {
clientConfig := *b.ClientConfig clientConfig := *b.ClientConfig
return &clientConfig, nil return restclient.AddUserAgent(&clientConfig, name), nil
} }
func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config {
@ -67,7 +67,7 @@ func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface,
if err != nil { if err != nil {
return nil, err return nil, err
} }
return clientset.NewForConfig(restclient.AddUserAgent(clientConfig, name)) return clientset.NewForConfig(clientConfig)
} }
func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface { func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface {