diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 4981e2d3f9d..06977e29d87 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -19,10 +19,8 @@ package app import ( "context" "fmt" - "net" "net/http" "os" - "strings" "time" "github.com/spf13/cobra" @@ -34,7 +32,6 @@ import ( "k8s.io/apiserver/pkg/server/healthz" apiserverflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/globalflag" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" cloudprovider "k8s.io/cloud-provider" @@ -43,9 +40,6 @@ import ( "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options" genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app" cmoptions "k8s.io/kubernetes/cmd/controller-manager/app/options" - cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud" - routecontroller "k8s.io/kubernetes/pkg/controller/route" - servicecontroller "k8s.io/kubernetes/pkg/controller/service" "k8s.io/kubernetes/pkg/util/configz" utilflag "k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/version" @@ -165,7 +159,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error } run := func(ctx context.Context) { - if err := startControllers(c, ctx.Done(), cloud); err != nil { + if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil { klog.Fatalf("error running controllers: %v", err) } } @@ -215,87 +209,40 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error } // startControllers starts the cloud specific controller loops. -func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan struct{}, cloud cloudprovider.Interface) error { - // Function to build the kube client object - client := func(serviceAccountName string) kubernetes.Interface { - return c.ClientBuilder.ClientOrDie(serviceAccountName) - } +func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error { if cloud != nil { // Initialize the cloud provider with a reference to the clientBuilder - cloud.Initialize(c.ClientBuilder, stop) - } - // Start the CloudNodeController - nodeController := cloudcontrollers.NewCloudNodeController( - c.SharedInformers.Core().V1().Nodes(), - // cloud node controller uses existing cluster role from node-controller - client("node-controller"), cloud, - c.ComponentConfig.NodeStatusUpdateFrequency.Duration) - - go nodeController.Run(stop) - time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) - - cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController( - c.SharedInformers.Core().V1().Nodes(), - // cloud node lifecycle controller uses existing cluster role from node-controller - client("node-controller"), cloud, - c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, - ) - if err != nil { - klog.Errorf("failed to start cloud node lifecycle controller: %s", err) - } else { - go cloudNodeLifecycleController.Run(stop) - time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) + cloud.Initialize(c.ClientBuilder, stopCh) } - // Start the PersistentVolumeLabelController - pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud) - go pvlController.Run(5, stop) - time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) - - // Start the service controller - serviceController, err := servicecontroller.New( - cloud, - client("service-controller"), - c.SharedInformers.Core().V1().Services(), - c.SharedInformers.Core().V1().Nodes(), - c.ComponentConfig.KubeCloudShared.ClusterName, - ) - if err != nil { - klog.Errorf("Failed to start service controller: %v", err) - } else { - go serviceController.Run(stop, int(c.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) - time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) - } - - // If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller - if c.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs && c.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { - if routes, ok := cloud.Routes(); !ok { - klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") - } else { - var clusterCIDR *net.IPNet - if len(strings.TrimSpace(c.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 { - _, clusterCIDR, err = net.ParseCIDR(c.ComponentConfig.KubeCloudShared.ClusterCIDR) - if err != nil { - klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", c.ComponentConfig.KubeCloudShared.ClusterCIDR, err) - } - } - - routeController := routecontroller.New(routes, client("route-controller"), c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR) - go routeController.Run(stop, c.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) - time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) + for controllerName, initFn := range controllers { + if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) { + klog.Warningf("%q is disabled", controllerName) + continue } - } else { - klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", c.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, c.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) + + klog.V(1).Infof("Starting %q", controllerName) + _, started, err := initFn(c, cloud, stopCh) + if err != nil { + klog.Errorf("Error starting %q", controllerName) + return err + } + if !started { + klog.Warningf("Skipping %q", controllerName) + continue + } + klog.Infof("Started %q", controllerName) + + time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) } // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. - err = genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second) - if err != nil { + if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil { klog.Fatalf("Failed to wait for apiserver being healthy: %v", err) } - c.SharedInformers.Start(stop) + c.SharedInformers.Start(stopCh) select {} } diff --git a/cmd/cloud-controller-manager/app/core.go b/cmd/cloud-controller-manager/app/core.go new file mode 100644 index 00000000000..628237bcb40 --- /dev/null +++ b/cmd/cloud-controller-manager/app/core.go @@ -0,0 +1,131 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes node controllers, service and +// route controller, and so on. +// +package app + +import ( + "net" + "net/http" + "strings" + + cloudprovider "k8s.io/cloud-provider" + "k8s.io/klog" + cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config" + cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud" + routecontroller "k8s.io/kubernetes/pkg/controller/route" + servicecontroller "k8s.io/kubernetes/pkg/controller/service" +) + +func startCloudNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { + // Start the CloudNodeController + nodeController := cloudcontrollers.NewCloudNodeController( + ctx.SharedInformers.Core().V1().Nodes(), + // cloud node controller uses existing cluster role from node-controller + ctx.ClientBuilder.ClientOrDie("node-controller"), + cloud, + ctx.ComponentConfig.NodeStatusUpdateFrequency.Duration) + + go nodeController.Run(stopCh) + + return nil, true, nil +} + +func startCloudNodeLifecycleController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { + // Start the cloudNodeLifecycleController + cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController( + ctx.SharedInformers.Core().V1().Nodes(), + // cloud node lifecycle controller uses existing cluster role from node-controller + ctx.ClientBuilder.ClientOrDie("node-controller"), + cloud, + ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, + ) + if err != nil { + klog.Warningf("failed to start cloud node lifecycle controller: %s", err) + return nil, false, nil + } + + go cloudNodeLifecycleController.Run(stopCh) + + return nil, true, nil +} + +func startPersistentVolumeLabelController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { + // Start the PersistentVolumeLabelController + pvlController := cloudcontrollers.NewPersistentVolumeLabelController( + ctx.ClientBuilder.ClientOrDie("pvl-controller"), + cloud, + ) + go pvlController.Run(5, stopCh) + + return nil, true, nil +} + +func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { + // Start the service controller + serviceController, err := servicecontroller.New( + cloud, + ctx.ClientBuilder.ClientOrDie("service-controller"), + ctx.SharedInformers.Core().V1().Services(), + ctx.SharedInformers.Core().V1().Nodes(), + ctx.ComponentConfig.KubeCloudShared.ClusterName, + ) + if err != nil { + // This error shouldn't fail. It lives like this as a legacy. + klog.Errorf("Failed to start service controller: %v", err) + return nil, false, nil + } + + go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) + + return nil, true, nil +} + +func startRouteController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { + if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { + klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) + return nil, false, nil + } + + // If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller + routes, ok := cloud.Routes() + if !ok { + klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") + return nil, false, nil + } + var clusterCIDR *net.IPNet + var err error + if len(strings.TrimSpace(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 { + _, clusterCIDR, err = net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) + if err != nil { + klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.ComponentConfig.KubeCloudShared.ClusterCIDR, err) + } + } + + routeController := routecontroller.New( + routes, + ctx.ClientBuilder.ClientOrDie("route-controller"), + ctx.SharedInformers.Core().V1().Nodes(), + ctx.ComponentConfig.KubeCloudShared.ClusterName, + clusterCIDR, + ) + go routeController.Run(stopCh, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) + + return nil, true, nil +} diff --git a/cmd/controller-manager/app/helper.go b/cmd/controller-manager/app/helper.go index 8bf1edee52a..7a156567d6f 100644 --- a/cmd/controller-manager/app/helper.go +++ b/cmd/controller-manager/app/helper.go @@ -21,6 +21,7 @@ import ( "net/http" "time" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog" @@ -53,3 +54,26 @@ func WaitForAPIServer(client clientset.Interface, timeout time.Duration) error { return nil } + +// IsControllerEnabled check if a specified controller enabled or not. +func IsControllerEnabled(name string, disabledByDefaultControllers sets.String, controllers []string) bool { + hasStar := false + for _, ctrl := range controllers { + if ctrl == name { + return true + } + if ctrl == "-"+name { + return false + } + if ctrl == "*" { + hasStar = true + } + } + // if we get here, there was no explicit choice + if !hasStar { + // nothing on by default + return false + } + + return !disabledByDefaultControllers.Has(name) +} diff --git a/cmd/kube-controller-manager/app/controller_manager_test.go b/cmd/controller-manager/app/helper_test.go similarity index 92% rename from cmd/kube-controller-manager/app/controller_manager_test.go rename to cmd/controller-manager/app/helper_test.go index 2c3e3f92596..9bc3eeaba29 100644 --- a/cmd/kube-controller-manager/app/controller_manager_test.go +++ b/cmd/controller-manager/app/helper_test.go @@ -14,10 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package app implements a server that runs a set of active -// components. This includes replication controllers, service endpoints and -// nodes. -// package app import ( @@ -74,7 +70,7 @@ func TestIsControllerEnabled(t *testing.T) { } for _, tc := range tcs { - actual := IsControllerEnabled(tc.controllerName, sets.NewString(tc.disabledByDefaultControllers...), tc.controllers...) + actual := IsControllerEnabled(tc.controllerName, sets.NewString(tc.disabledByDefaultControllers...), tc.controllers) assert.Equal(t, tc.expected, actual, "%v: expected %v, got %v", tc.name, tc.expected, actual) } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index b840523db41..9354d02a370 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -301,32 +301,7 @@ type ControllerContext struct { } func (c ControllerContext) IsControllerEnabled(name string) bool { - return IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers...) -} - -func IsControllerEnabled(name string, disabledByDefaultControllers sets.String, controllers ...string) bool { - hasStar := false - for _, ctrl := range controllers { - if ctrl == name { - return true - } - if ctrl == "-"+name { - return false - } - if ctrl == "*" { - hasStar = true - } - } - // if we get here, there was no explicit choice - if !hasStar { - // nothing on by default - return false - } - if disabledByDefaultControllers.Has(name) { - return false - } - - return true + return genericcontrollermanager.IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) } // InitFunc is used to launch a particular controller. It may run additional "should I activate checks".