diff --git a/cmd/cloud-controller-manager/app/config/config.go b/cmd/cloud-controller-manager/app/config/config.go index 3d201e47546..9282cfd1066 100644 --- a/cmd/cloud-controller-manager/app/config/config.go +++ b/cmd/cloud-controller-manager/app/config/config.go @@ -18,11 +18,13 @@ package app import ( apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app" "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/pkg/controller" ) // Config is the main context object for the cloud controller manager. @@ -46,6 +48,15 @@ type Config struct { // the event sink EventRecorder record.EventRecorder + + // ClientBuilder will provide a client for this controller to use + ClientBuilder controller.ControllerClientBuilder + + // VersionedClient will provide a client for informers + VersionedClient clientset.Interface + + // SharedInformers gives access to informers for the controller. + SharedInformers informers.SharedInformerFactory } type completedConfig struct { diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 0b73740cbea..00a116572f2 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -19,7 +19,6 @@ package app import ( "context" "fmt" - "math/rand" "net" "os" "strings" @@ -30,16 +29,13 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config" "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options" genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller" cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud" routecontroller "k8s.io/kubernetes/pkg/controller/route" servicecontroller "k8s.io/kubernetes/pkg/controller/service" @@ -86,14 +82,6 @@ the cloud specific control loops shipped with Kubernetes.`, return cmd } -// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server -func resyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration { - return func() time.Duration { - factor := rand.Float64() + 1 - return time.Duration(float64(c.ComponentConfig.GenericComponent.MinResyncPeriod.Nanoseconds()) * factor) - } -} - // Run runs the ExternalCMServer. This should never exit. func Run(c *cloudcontrollerconfig.CompletedConfig) error { cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.CloudProvider.Name, c.ComponentConfig.CloudProvider.CloudConfigFile) @@ -137,22 +125,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } run := func(ctx context.Context) { - rootClientBuilder := controller.SimpleControllerClientBuilder{ - ClientConfig: c.Kubeconfig, - } - var clientBuilder controller.ControllerClientBuilder - if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials { - clientBuilder = controller.SAControllerClientBuilder{ - ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig), - CoreClient: c.Client.CoreV1(), - AuthenticationClient: c.Client.AuthenticationV1(), - Namespace: "kube-system", - } - } else { - clientBuilder = rootClientBuilder - } - - if err := startControllers(c, rootClientBuilder, clientBuilder, ctx.Done(), cloud); err != nil { + if err := startControllers(c, ctx.Done(), cloud); err != nil { glog.Fatalf("error running controllers: %v", err) } } @@ -200,23 +173,18 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } // startControllers starts the cloud specific controller loops. -func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, cloud cloudprovider.Interface) error { +func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan struct{}, cloud cloudprovider.Interface) error { // Function to build the kube client object client := func(serviceAccountName string) kubernetes.Interface { - return clientBuilder.ClientOrDie(serviceAccountName) + return c.ClientBuilder.ClientOrDie(serviceAccountName) } if cloud != nil { // Initialize the cloud provider with a reference to the clientBuilder - cloud.Initialize(clientBuilder) + cloud.Initialize(c.ClientBuilder) } - - // TODO: move this setup into Config - versionedClient := rootClientBuilder.ClientOrDie("shared-informers") - sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(c)()) - // Start the CloudNodeController nodeController := cloudcontrollers.NewCloudNodeController( - sharedInformers.Core().V1().Nodes(), + c.SharedInformers.Core().V1().Nodes(), client("cloud-node-controller"), cloud, c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, c.ComponentConfig.NodeStatusUpdateFrequency.Duration) @@ -233,8 +201,8 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde serviceController, err := servicecontroller.New( cloud, client("service-controller"), - sharedInformers.Core().V1().Services(), - sharedInformers.Core().V1().Nodes(), + c.SharedInformers.Core().V1().Services(), + c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, ) if err != nil { @@ -249,7 +217,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - var clusterCIDR *net.IPNet + var clusterCIDR *net.IPNet = nil if len(strings.TrimSpace(c.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 { _, clusterCIDR, err = net.ParseCIDR(c.ComponentConfig.KubeCloudShared.ClusterCIDR) if err != nil { @@ -257,7 +225,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde } } - routeController := routecontroller.New(routes, client("route-controller"), sharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR) + routeController := routecontroller.New(routes, client("route-controller"), c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR) go routeController.Run(stop, c.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -267,12 +235,12 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. - err = genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second) + err = genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second) if err != nil { glog.Fatalf("Failed to wait for apiserver being healthy: %v", err) } - sharedInformers.Start(stop) + c.SharedInformers.Start(stop) select {} } diff --git a/cmd/cloud-controller-manager/app/options/options.go b/cmd/cloud-controller-manager/app/options/options.go index 2dc7a0534fb..6f48c54954f 100644 --- a/cmd/cloud-controller-manager/app/options/options.go +++ b/cmd/cloud-controller-manager/app/options/options.go @@ -18,7 +18,9 @@ package options import ( "fmt" + "math/rand" "net" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,6 +28,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" apiserveroptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -37,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/componentconfig" componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/master/ports" // add the kubernetes feature gates _ "k8s.io/kubernetes/pkg/features" @@ -146,40 +150,35 @@ func (o *CloudControllerManagerOptions) AddFlags(fs *pflag.FlagSet) { // ApplyTo fills up cloud controller manager config with options. func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config, userAgent string) error { - if err := o.CloudProvider.ApplyTo(&c.ComponentConfig.CloudProvider); err != nil { - return err - } - if err := o.Debugging.ApplyTo(&c.ComponentConfig.Debugging); err != nil { - return err - } - if err := o.GenericComponent.ApplyTo(&c.ComponentConfig.GenericComponent); err != nil { - return err - } - if err := o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil { - return err - } - if err := o.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil { - return err - } - if err := o.SecureServing.ApplyTo(&c.SecureServing); err != nil { - return err - } - if err := o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil { - return err - } - if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil { - return err - } - if err := o.Authorization.ApplyTo(&c.Authorization); err != nil { - return err - } - - // sync back to component config - // TODO: find more elegant way than syncing back the values. - c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort) - c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String() - var err error + if err = o.CloudProvider.ApplyTo(&c.ComponentConfig.CloudProvider); err != nil { + return err + } + if err = o.Debugging.ApplyTo(&c.ComponentConfig.Debugging); err != nil { + return err + } + if err = o.GenericComponent.ApplyTo(&c.ComponentConfig.GenericComponent); err != nil { + return err + } + if err = o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil { + return err + } + if err = o.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil { + return err + } + if err = o.SecureServing.ApplyTo(&c.SecureServing); err != nil { + return err + } + if err = o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil { + return err + } + if err = o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil { + return err + } + if err = o.Authorization.ApplyTo(&c.Authorization); err != nil { + return err + } + c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) if err != nil { return err @@ -196,6 +195,28 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config, c.LeaderElectionClient = clientset.NewForConfigOrDie(restclient.AddUserAgent(c.Kubeconfig, "leader-election")) c.EventRecorder = createRecorder(c.Client, userAgent) + + rootClientBuilder := controller.SimpleControllerClientBuilder{ + ClientConfig: c.Kubeconfig, + } + if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials { + c.ClientBuilder = controller.SAControllerClientBuilder{ + ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig), + CoreClient: c.Client.CoreV1(), + AuthenticationClient: c.Client.AuthenticationV1(), + Namespace: "kube-system", + } + } else { + c.ClientBuilder = rootClientBuilder + } + c.VersionedClient = rootClientBuilder.ClientOrDie("shared-informers") + c.SharedInformers = informers.NewSharedInformerFactory(c.VersionedClient, resyncPeriod(c)()) + + // sync back to component config + // TODO: find more elegant way than syncing back the values. + c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort) + c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String() + c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency return nil @@ -222,6 +243,14 @@ func (o *CloudControllerManagerOptions) Validate() error { return utilerrors.NewAggregate(errors) } +// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server +func resyncPeriod(c *cloudcontrollerconfig.Config) func() time.Duration { + return func() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(c.ComponentConfig.GenericComponent.MinResyncPeriod.Nanoseconds()) * factor) + } +} + // Config return a cloud controller manager config objective func (o *CloudControllerManagerOptions) Config() (*cloudcontrollerconfig.Config, error) { if err := o.Validate(); err != nil {