Merge pull request #46783 from deads2k/controller-08-initfn

Automatic merge from submit-queue (batch tested with PRs 40760, 46706, 46783, 46742, 46751)

complete the controller context for init funcs

This completes the conversion to initFuncs for the controller initialization to make easier and more manageable to add them.
This commit is contained in:
Kubernetes Submit Queue 2017-06-03 18:30:42 -07:00 committed by GitHub
commit 638c7382ae
2 changed files with 234 additions and 206 deletions

View File

@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -55,13 +54,7 @@ import (
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
@ -175,10 +168,19 @@ func Run(s *options.CMServer) error {
} else { } else {
clientBuilder = rootClientBuilder clientBuilder = rootClientBuilder
} }
ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
err := StartControllers(NewControllerInitializers(), s, rootClientBuilder, clientBuilder, stop) if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil {
glog.Fatalf("error running controllers: %v", err) glog.Fatalf("error starting controllers: %v", err)
panic("unreachable") }
ctx.InformerFactory.Start(ctx.Stop)
select {}
} }
if !s.LeaderElection.LeaderElect { if !s.LeaderElection.LeaderElect {
@ -231,6 +233,10 @@ type ControllerContext struct {
// AvailableResources is a map listing currently available resources // AvailableResources is a map listing currently available resources
AvailableResources map[schema.GroupVersionResource]bool AvailableResources map[schema.GroupVersionResource]bool
// Cloud is the cloud provider interface for the controllers to use.
// It must be initialized and ready to use.
Cloud cloudprovider.Interface
// Stop is the stop channel // Stop is the stop channel
Stop <-chan struct{} Stop <-chan struct{}
} }
@ -272,16 +278,14 @@ type InitFunc func(ctx ControllerContext) (bool, error)
func KnownControllers() []string { func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers()) ret := sets.StringKeySet(NewControllerInitializers())
// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// using a normal function. The only known special case is the SA token controller which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
// to this list.
ret.Insert( ret.Insert(
saTokenControllerName, saTokenControllerName,
nodeControllerName,
serviceControllerName,
routeControllerName,
pvBinderControllerName,
attachDetachControllerName,
) )
// add "special" controllers that aren't initialized normally
return ret.List() return ret.List()
} }
@ -290,6 +294,10 @@ var ControllersDisabledByDefault = sets.NewString(
"tokencleaner", "tokencleaner",
) )
const (
saTokenControllerName = "serviceaccount-token"
)
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision. // paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers() map[string]InitFunc { func NewControllerInitializers() map[string]InitFunc {
@ -314,6 +322,11 @@ func NewControllerInitializers() map[string]InitFunc {
controllers["ttl"] = startTTLController controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController controllers["tokencleaner"] = startTokenCleanerController
controllers["service"] = startServiceController
controllers["node"] = startNodeController
controllers["route"] = startRouteController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
return controllers return controllers
} }
@ -366,61 +379,22 @@ func GetAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
return allResources, nil return allResources, nil
} }
const ( func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
saTokenControllerName = "serviceaccount-token"
nodeControllerName = "node"
serviceControllerName = "service"
routeControllerName = "route"
pvBinderControllerName = "persistentvolume-binder"
attachDetachControllerName = "attachdetach"
)
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
versionedClient := rootClientBuilder.ClientOrDie("shared-informers") versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest availableResources, err := GetAvailableResources(rootClientBuilder)
if len(s.ServiceAccountKeyFile) > 0 && IsControllerEnabled(saTokenControllerName, ControllersDisabledByDefault, s.Controllers...) {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil { if err != nil {
return fmt.Errorf("error reading key for service account token controller: %v", err) return ControllerContext{}, err
} else { }
var rootCA []byte
if s.RootCAFile != "" { cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
rootCA, err = ioutil.ReadFile(s.RootCAFile)
if err != nil { if err != nil {
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err) return ControllerContext{}, fmt.Errorf("cloud provider could not be initialized: %v", err)
} }
if _, err := certutil.ParseCertsPEM(rootCA); err != nil { if cloud != nil {
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err) // Initialize the cloud provider with a reference to the clientBuilder
} cloud.Initialize(rootClientBuilder)
} else {
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}
controller := serviceaccountcontroller.NewTokensController(
sharedInformers.Core().V1().ServiceAccounts(),
sharedInformers.Core().V1().Secrets(),
rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go controller.Run(int(s.ConcurrentSATokenSyncs), stop)
// start the first set of informers now so that other controllers can start
sharedInformers.Start(stop)
}
} else {
glog.Warningf("%q is disabled", saTokenControllerName)
}
availableResources, err := GetAvailableResources(clientBuilder)
if err != nil {
return err
} }
ctx := ControllerContext{ ctx := ControllerContext{
@ -428,8 +402,18 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
InformerFactory: sharedInformers, InformerFactory: sharedInformers,
Options: *s, Options: *s,
AvailableResources: availableResources, AvailableResources: availableResources,
Cloud: cloud,
Stop: stop, Stop: stop,
} }
return ctx, nil
}
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if _, err := startSATokenController(ctx); err != nil {
return err
}
for controllerName, initFn := range controllers { for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) { if !ctx.IsControllerEnabled(controllerName) {
@ -437,7 +421,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
continue continue
} }
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName) glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx) started, err := initFn(ctx)
@ -452,144 +436,57 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
glog.Infof("Started %q", controllerName) glog.Infof("Started %q", controllerName)
} }
// all the remaining plugins want this cloud variable return nil
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return fmt.Errorf("cloud provider could not be initialized: %v", err)
} }
if cloud != nil { // serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
// Initialize the cloud provider with a reference to the clientBuilder // It cannot use the "normal" client builder, so it tracks its own. It must also avoid being included in the "normal"
cloud.Initialize(clientBuilder) // init map so that it can always run first.
type serviceAccountTokenControllerStarter struct {
rootClientBuilder controller.ControllerClientBuilder
} }
if ctx.IsControllerEnabled(nodeControllerName) { func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR) if !ctx.IsControllerEnabled(saTokenControllerName) {
if err != nil { glog.Warningf("%q is disabled", saTokenControllerName)
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err) return false, nil
} }
_, serviceCIDR, err := net.ParseCIDR(s.ServiceCIDR)
if err != nil { if len(ctx.Options.ServiceAccountKeyFile) == 0 {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
return false, nil
} }
nodeController, err := nodecontroller.NewNodeController( privateKey, err := serviceaccount.ReadPrivateKey(ctx.Options.ServiceAccountKeyFile)
sharedInformers.Core().V1().Pods(), if err != nil {
sharedInformers.Core().V1().Nodes(), return true, fmt.Errorf("error reading key for service account token controller: %v", err)
sharedInformers.Extensions().V1beta1().DaemonSets(), }
cloud,
clientBuilder.ClientOrDie("node-controller"), var rootCA []byte
s.PodEvictionTimeout.Duration, if ctx.Options.RootCAFile != "" {
s.NodeEvictionRate, rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
s.SecondaryNodeEvictionRate, if err != nil {
s.LargeClusterSizeThreshold, return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
s.UnhealthyZoneThreshold, }
s.NodeMonitorGracePeriod.Duration, if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
s.NodeStartupGracePeriod.Duration, return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
s.NodeMonitorPeriod.Duration, }
clusterCIDR, } else {
serviceCIDR, rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
int(s.NodeCIDRMaskSize), }
s.AllocateNodeCIDRs,
nodecontroller.CIDRAllocatorType(s.CIDRAllocatorType), controller := serviceaccountcontroller.NewTokensController(
s.EnableTaintManager, ctx.InformerFactory.Core().V1().ServiceAccounts(),
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions), ctx.InformerFactory.Core().V1().Secrets(),
c.rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
) )
if err != nil { go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)
return fmt.Errorf("failed to initialize nodecontroller: %v", err)
}
go nodeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", nodeControllerName)
}
if ctx.IsControllerEnabled(serviceControllerName) { // start the first set of informers now so that other controllers can start
serviceController, err := servicecontroller.New( ctx.InformerFactory.Start(ctx.Stop)
cloud,
clientBuilder.ClientOrDie("service-controller"),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", serviceControllerName)
}
if ctx.IsControllerEnabled(routeControllerName) { return true, nil
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
}
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
if cloud == nil {
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
} else if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
} else {
glog.Warningf("%q is disabled", routeControllerName)
}
if ctx.IsControllerEnabled(pvBinderControllerName) {
params := persistentvolumecontroller.ControllerParameters{
KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
Cloud: cloud,
ClusterName: s.ClusterName,
VolumeInformer: sharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: sharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: sharedInformers.Storage().V1().StorageClasses(),
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
}
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
}
go volumeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", pvBinderControllerName)
}
if ctx.IsControllerEnabled(attachDetachControllerName) {
if s.ReconcilerSyncLoopPeriod.Duration < time.Second {
return fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
}
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
clientBuilder.ClientOrDie("attachdetach-controller"),
sharedInformers.Core().V1().Pods(),
sharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().PersistentVolumeClaims(),
sharedInformers.Core().V1().PersistentVolumes(),
cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
s.DisableAttachDetachReconcilerSync,
s.ReconcilerSyncLoopPeriod.Duration)
if attachDetachControllerErr != nil {
return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
}
go attachDetachController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", attachDetachControllerName)
}
sharedInformers.Start(stop)
select {}
} }

View File

@ -22,10 +22,15 @@ package app
import ( import (
"fmt" "fmt"
"net"
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -37,14 +42,140 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/podgc" "k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/features"
quotainstall "k8s.io/kubernetes/pkg/quota/install" quotainstall "k8s.io/kubernetes/pkg/quota/install"
) )
func startServiceController(ctx ControllerContext) (bool, error) {
serviceController, err := servicecontroller.New(
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.Options.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
return false, nil
}
go serviceController.Run(ctx.Stop, int(ctx.Options.ConcurrentServiceSyncs))
return true, nil
}
func startNodeController(ctx ControllerContext) (bool, error) {
_, clusterCIDR, err := net.ParseCIDR(ctx.Options.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.Options.ClusterCIDR, err)
}
_, serviceCIDR, err := net.ParseCIDR(ctx.Options.ServiceCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.Options.ServiceCIDR, err)
}
nodeController, err := nodecontroller.NewNodeController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Options.PodEvictionTimeout.Duration,
ctx.Options.NodeEvictionRate,
ctx.Options.SecondaryNodeEvictionRate,
ctx.Options.LargeClusterSizeThreshold,
ctx.Options.UnhealthyZoneThreshold,
ctx.Options.NodeMonitorGracePeriod.Duration,
ctx.Options.NodeStartupGracePeriod.Duration,
ctx.Options.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
nodecontroller.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)
if err != nil {
return true, err
}
go nodeController.Run(ctx.Stop)
return true, nil
}
func startRouteController(ctx ControllerContext) (bool, error) {
_, clusterCIDR, err := net.ParseCIDR(ctx.Options.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.Options.ClusterCIDR, err)
}
// TODO demorgans
if ctx.Options.AllocateNodeCIDRs && ctx.Options.ConfigureCloudRoutes {
if ctx.Cloud == nil {
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
return false, nil
} else if routes, ok := ctx.Cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
return false, nil
} else {
routeController := routecontroller.New(routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.InformerFactory.Core().V1().Nodes(), ctx.Options.ClusterName, clusterCIDR)
go routeController.Run(ctx.Stop, ctx.Options.RouteReconciliationPeriod.Duration)
return true, nil
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.Options.AllocateNodeCIDRs, ctx.Options.ConfigureCloudRoutes)
return false, nil
}
}
func startPersistentVolumeBinderController(ctx ControllerContext) (bool, error) {
params := persistentvolumecontroller.ControllerParameters{
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: ctx.Options.PVClaimBinderSyncPeriod.Duration,
VolumePlugins: ProbeControllerVolumePlugins(ctx.Cloud, ctx.Options.VolumeConfiguration),
Cloud: ctx.Cloud,
ClusterName: ctx.Options.ClusterName,
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
EnableDynamicProvisioning: ctx.Options.VolumeConfiguration.EnableDynamicProvisioning,
}
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
}
go volumeController.Run(ctx.Stop)
return true, nil
}
func startAttachDetachController(ctx ControllerContext) (bool, error) {
if ctx.Options.ReconcilerSyncLoopPeriod.Duration < time.Second {
return true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
}
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.Cloud,
ProbeAttachableVolumePlugins(ctx.Options.VolumeConfiguration),
ctx.Options.DisableAttachDetachReconcilerSync,
ctx.Options.ReconcilerSyncLoopPeriod.Duration)
if attachDetachControllerErr != nil {
return true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
}
go attachDetachController.Run(ctx.Stop)
return true, nil
}
func startEndpointController(ctx ControllerContext) (bool, error) { func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController( go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),