Merge pull request #37976 from deads2k/controller-01-sa

Automatic merge from submit-queue (batch tested with PRs 36352, 36538, 37976, 36374)

demonstrate separation of controller intializers

Currently, controllers are all initialized in a monster method that make it difficult to individually pick out whether there are side-effects, difficult to group related controllers for selective enablement, and impossible to determine if there are hidden dependencies.

This pull demonstrates how we can break apart the monolith and start start the process of grouping and naming controllers for selective enablement.  In addition, the use of a map will help expose dependency ordering amongst these controllers and the separate methods will make it a lot harder to have side effects.

This also moves us closer to being able to author reflective unit tests that help ensure that basic RBAC bootstrap roles are at least present, even if they aren't correct.

@nikhiljindal since you were looking at the federation controller manager
@sttts since we're looking at trying out RBAC on these.
This commit is contained in:
Kubernetes Submit Queue 2016-12-05 11:08:47 -08:00 committed by GitHub
commit b1366bf55d
3 changed files with 152 additions and 68 deletions

View File

@ -181,7 +181,7 @@ func Run(s *options.CMServer) error {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, rootClientBuilder, clientBuilder, stop)
err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
@ -224,6 +224,127 @@ func Run(s *options.CMServer) error {
panic("unreachable")
}
type ControllerContext struct {
// ClientBuilder will provide a client for this controller to use
ClientBuilder controller.ControllerClientBuilder
// InformerFactory gives access to informers for the controller
InformerFactory informers.SharedInformerFactory
// Options provides access to init options for a given controller
Options options.CMServer
// AvailableResources is a map listing currently available resources
AvailableResources map[schema.GroupVersionResource]bool
// Stop is the stop channel
Stop <-chan struct{}
}
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
type InitFunc func(ctx ControllerContext) (bool, error)
func newControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startEndpointController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
return controllers
}
func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
}
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(&ctx.Options),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
return true, nil
}
func startPodGCController(ctx ControllerContext) (bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Pods().Informer(),
int(ctx.Options.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return true, nil
}
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(
resourceQuotaControllerOptions,
).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
return true, nil
}
func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return true, fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return true, fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return true, fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil
}
// TODO: In general, any controller checking this needs to be dynamic so
// users don't have to restart their controller manager if they change the apiserver.
func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) {
@ -264,7 +385,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
return allResources, nil
}
func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
@ -302,23 +423,28 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
return err
}
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")).
Run(int(s.ConcurrentEndpointSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Options: *s,
AvailableResources: availableResources,
Stop: stop,
}
go replicationcontroller.NewReplicationManager(
sharedInformers.Pods().Informer(),
clientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
int(s.LookupCacheSizeForRC),
s.EnableGarbageCollector,
).Run(int(s.ConcurrentRCSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
for controllerName, initFn := range controllers {
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(),
int(s.TerminatedPodGCThreshold)).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
if err != nil {
glog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
glog.Warningf("Skipping %q", controllerName)
}
glog.Infof("Started %q", controllerName)
}
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
@ -367,57 +493,6 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), stop)
@ -546,6 +621,9 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.EnableGarbageCollector {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector")
preferredResources, err := gcClientset.Discovery().ServerPreferredResources()
if err != nil {

View File

@ -39,5 +39,6 @@ go_library(
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -21,6 +21,8 @@ import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
@ -90,8 +92,11 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
glog.V(1).Infoln("Starting informer factory")
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
glog.V(2).Infof("Starting informer for %v", informerType)
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}