[cloud-controller manager]move more setup code from startControllers into the config

This commit is contained in:
stewart-yu 2018-06-20 17:29:06 +08:00
parent 1a75395da8
commit e9296b50f3
3 changed files with 84 additions and 76 deletions

View File

@ -18,11 +18,13 @@ package app
import (
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/controller"
)
// Config is the main context object for the cloud controller manager.
@ -46,6 +48,15 @@ type Config struct {
// the event sink
EventRecorder record.EventRecorder
// ClientBuilder will provide a client for this controller to use
ClientBuilder controller.ControllerClientBuilder
// VersionedClient will provide a client for informers
VersionedClient clientset.Interface
// SharedInformers gives access to informers for the controller.
SharedInformers informers.SharedInformerFactory
}
type completedConfig struct {

View File

@ -19,7 +19,6 @@ package app
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strings"
@ -30,16 +29,13 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
@ -86,14 +82,6 @@ the cloud specific control loops shipped with Kubernetes.`,
return cmd
}
// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(c.ComponentConfig.GenericComponent.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// Run runs the ExternalCMServer. This should never exit.
func Run(c *cloudcontrollerconfig.CompletedConfig) error {
cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.CloudProvider.Name, c.ComponentConfig.CloudProvider.CloudConfigFile)
@ -137,22 +125,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
}
run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
if err := startControllers(c, rootClientBuilder, clientBuilder, ctx.Done(), cloud); err != nil {
if err := startControllers(c, ctx.Done(), cloud); err != nil {
glog.Fatalf("error running controllers: %v", err)
}
}
@ -200,23 +173,18 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
}
// startControllers starts the cloud specific controller loops.
func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, cloud cloudprovider.Interface) error {
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan struct{}, cloud cloudprovider.Interface) error {
// Function to build the kube client object
client := func(serviceAccountName string) kubernetes.Interface {
return clientBuilder.ClientOrDie(serviceAccountName)
return c.ClientBuilder.ClientOrDie(serviceAccountName)
}
if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(clientBuilder)
cloud.Initialize(c.ClientBuilder)
}
// TODO: move this setup into Config
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(c)())
// Start the CloudNodeController
nodeController := cloudcontrollers.NewCloudNodeController(
sharedInformers.Core().V1().Nodes(),
c.SharedInformers.Core().V1().Nodes(),
client("cloud-node-controller"), cloud,
c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
c.ComponentConfig.NodeStatusUpdateFrequency.Duration)
@ -233,8 +201,8 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
c.SharedInformers.Core().V1().Services(),
c.SharedInformers.Core().V1().Nodes(),
c.ComponentConfig.KubeCloudShared.ClusterName,
)
if err != nil {
@ -249,7 +217,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde
if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
var clusterCIDR *net.IPNet
var clusterCIDR *net.IPNet = nil
if len(strings.TrimSpace(c.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 {
_, clusterCIDR, err = net.ParseCIDR(c.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
@ -257,7 +225,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde
}
}
routeController := routecontroller.New(routes, client("route-controller"), sharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
routeController := routecontroller.New(routes, client("route-controller"), c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
go routeController.Run(stop, c.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
}
@ -267,12 +235,12 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, rootClientBuilde
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
err = genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second)
err = genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second)
if err != nil {
glog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
}
sharedInformers.Start(stop)
c.SharedInformers.Start(stop)
select {}
}

View File

@ -18,7 +18,9 @@ package options
import (
"fmt"
"math/rand"
"net"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,6 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -37,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/master/ports"
// add the kubernetes feature gates
_ "k8s.io/kubernetes/pkg/features"
@ -146,40 +150,35 @@ func (o *CloudControllerManagerOptions) AddFlags(fs *pflag.FlagSet) {
// ApplyTo fills up cloud controller manager config with options.
func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config, userAgent string) error {
if err := o.CloudProvider.ApplyTo(&c.ComponentConfig.CloudProvider); err != nil {
return err
}
if err := o.Debugging.ApplyTo(&c.ComponentConfig.Debugging); err != nil {
return err
}
if err := o.GenericComponent.ApplyTo(&c.ComponentConfig.GenericComponent); err != nil {
return err
}
if err := o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil {
return err
}
if err := o.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil {
return err
}
if err := o.SecureServing.ApplyTo(&c.SecureServing); err != nil {
return err
}
if err := o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String()
var err error
if err = o.CloudProvider.ApplyTo(&c.ComponentConfig.CloudProvider); err != nil {
return err
}
if err = o.Debugging.ApplyTo(&c.ComponentConfig.Debugging); err != nil {
return err
}
if err = o.GenericComponent.ApplyTo(&c.ComponentConfig.GenericComponent); err != nil {
return err
}
if err = o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil {
return err
}
if err = o.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil {
return err
}
if err = o.SecureServing.ApplyTo(&c.SecureServing); err != nil {
return err
}
if err = o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil {
return err
}
if err = o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err = o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {
return err
@ -196,6 +195,28 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config,
c.LeaderElectionClient = clientset.NewForConfigOrDie(restclient.AddUserAgent(c.Kubeconfig, "leader-election"))
c.EventRecorder = createRecorder(c.Client, userAgent)
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
c.ClientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
c.ClientBuilder = rootClientBuilder
}
c.VersionedClient = rootClientBuilder.ClientOrDie("shared-informers")
c.SharedInformers = informers.NewSharedInformerFactory(c.VersionedClient, resyncPeriod(c)())
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String()
c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
return nil
@ -222,6 +243,14 @@ func (o *CloudControllerManagerOptions) Validate() error {
return utilerrors.NewAggregate(errors)
}
// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(c *cloudcontrollerconfig.Config) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(c.ComponentConfig.GenericComponent.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// Config return a cloud controller manager config objective
func (o *CloudControllerManagerOptions) Config() (*cloudcontrollerconfig.Config, error) {
if err := o.Validate(); err != nil {