mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Merge pull request #111462 from jprzychodzen/controllers
Enable 'running_managed_controllers' for KCM/CCM controllers: routes, services and cloud-node
This commit is contained in:
commit
1de16be28f
@ -90,7 +90,7 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon
|
|||||||
klog.Errorf("Failed to start service controller: %v", err)
|
klog.Errorf("Failed to start service controller: %v", err)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
|
go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controllerContext.ControllerManagerMetrics)
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +251,7 @@ func startRouteController(ctx context.Context, controllerContext ControllerConte
|
|||||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||||
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
|
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
|
||||||
clusterCIDRs)
|
clusterCIDRs)
|
||||||
go routeController.Run(ctx, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
go routeController.Run(ctx, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration, controllerContext.ControllerManagerMetrics)
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,7 +364,7 @@ type ControllerInitContext struct {
|
|||||||
// StartCloudNodeControllerWrapper is used to take cloud config as input and start cloud node controller
|
// StartCloudNodeControllerWrapper is used to take cloud config as input and start cloud node controller
|
||||||
func StartCloudNodeControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
func StartCloudNodeControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
||||||
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
||||||
return startCloudNodeController(ctx, initContext, completedConfig, cloud)
|
return startCloudNodeController(ctx, initContext, controllerContext, completedConfig, cloud)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -378,14 +378,14 @@ func StartCloudNodeLifecycleControllerWrapper(initContext ControllerInitContext,
|
|||||||
// StartServiceControllerWrapper is used to take cloud config as input and start service controller
|
// StartServiceControllerWrapper is used to take cloud config as input and start service controller
|
||||||
func StartServiceControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
func StartServiceControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
||||||
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
||||||
return startServiceController(ctx, initContext, completedConfig, cloud)
|
return startServiceController(ctx, initContext, controllerContext, completedConfig, cloud)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartRouteControllerWrapper is used to take cloud config as input and start route controller
|
// StartRouteControllerWrapper is used to take cloud config as input and start route controller
|
||||||
func StartRouteControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
func StartRouteControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
|
||||||
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
|
||||||
return startRouteController(ctx, initContext, completedConfig, cloud)
|
return startRouteController(ctx, initContext, controllerContext, completedConfig, cloud)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ import (
|
|||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
)
|
)
|
||||||
|
|
||||||
func startCloudNodeController(ctx context.Context, initContext ControllerInitContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
func startCloudNodeController(ctx context.Context, initContext ControllerInitContext, controlexContext controllermanagerapp.ControllerContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
||||||
// Start the CloudNodeController
|
// Start the CloudNodeController
|
||||||
nodeController, err := cloudnodecontroller.NewCloudNodeController(
|
nodeController, err := cloudnodecontroller.NewCloudNodeController(
|
||||||
completedConfig.SharedInformers.Core().V1().Nodes(),
|
completedConfig.SharedInformers.Core().V1().Nodes(),
|
||||||
@ -54,7 +54,7 @@ func startCloudNodeController(ctx context.Context, initContext ControllerInitCon
|
|||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go nodeController.Run(ctx.Done())
|
go nodeController.Run(ctx.Done(), controlexContext.ControllerManagerMetrics)
|
||||||
|
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
@ -78,7 +78,7 @@ func startCloudNodeLifecycleController(ctx context.Context, initContext Controll
|
|||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServiceController(ctx context.Context, initContext ControllerInitContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
func startServiceController(ctx context.Context, initContext ControllerInitContext, controlexContext controllermanagerapp.ControllerContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
||||||
// Start the service controller
|
// Start the service controller
|
||||||
serviceController, err := servicecontroller.New(
|
serviceController, err := servicecontroller.New(
|
||||||
cloud,
|
cloud,
|
||||||
@ -94,12 +94,12 @@ func startServiceController(ctx context.Context, initContext ControllerInitConte
|
|||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go serviceController.Run(ctx, int(completedConfig.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
|
go serviceController.Run(ctx, int(completedConfig.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controlexContext.ControllerManagerMetrics)
|
||||||
|
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startRouteController(ctx context.Context, initContext ControllerInitContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
func startRouteController(ctx context.Context, initContext ControllerInitContext, controlexContext controllermanagerapp.ControllerContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
|
||||||
if !completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
if !completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||||
klog.Infof("Will not configure cloud provider routes, --configure-cloud-routes: %v", completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
klog.Infof("Will not configure cloud provider routes, --configure-cloud-routes: %v", completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
@ -140,7 +140,7 @@ func startRouteController(ctx context.Context, initContext ControllerInitContext
|
|||||||
completedConfig.ComponentConfig.KubeCloudShared.ClusterName,
|
completedConfig.ComponentConfig.KubeCloudShared.ClusterName,
|
||||||
clusterCIDRs,
|
clusterCIDRs,
|
||||||
)
|
)
|
||||||
go routeController.Run(ctx, completedConfig.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
go routeController.Run(ctx, completedConfig.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration, controlexContext.ControllerManagerMetrics)
|
||||||
|
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
cloudproviderapi "k8s.io/cloud-provider/api"
|
cloudproviderapi "k8s.io/cloud-provider/api"
|
||||||
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
|
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
|
||||||
|
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
|
||||||
nodeutil "k8s.io/component-helpers/node/util"
|
nodeutil "k8s.io/component-helpers/node/util"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
@ -148,12 +149,15 @@ func NewCloudNodeController(
|
|||||||
// This controller updates newly registered nodes with information
|
// This controller updates newly registered nodes with information
|
||||||
// from the cloud provider. This call is blocking so should be called
|
// from the cloud provider. This call is blocking so should be called
|
||||||
// via a goroutine
|
// via a goroutine
|
||||||
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
|
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer cnc.workqueue.ShutDown()
|
defer cnc.workqueue.ShutDown()
|
||||||
|
|
||||||
// Start event processing pipeline.
|
// Start event processing pipeline.
|
||||||
klog.Infof("Sending events to api server.")
|
klog.Infof("Sending events to api server.")
|
||||||
|
controllerManagerMetrics.ControllerStarted("cloud-node")
|
||||||
|
defer controllerManagerMetrics.ControllerStopped("cloud-node")
|
||||||
|
|
||||||
cnc.broadcaster.StartStructuredLogging(0)
|
cnc.broadcaster.StartStructuredLogging(0)
|
||||||
cnc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cnc.kubeClient.CoreV1().Events("")})
|
cnc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cnc.kubeClient.CoreV1().Events("")})
|
||||||
defer cnc.broadcaster.Shutdown()
|
defer cnc.broadcaster.Shutdown()
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
clientretry "k8s.io/client-go/util/retry"
|
clientretry "k8s.io/client-go/util/retry"
|
||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
|
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
|
||||||
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
||||||
nodeutil "k8s.io/component-helpers/node/util"
|
nodeutil "k8s.io/component-helpers/node/util"
|
||||||
)
|
)
|
||||||
@ -94,7 +95,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
|
|||||||
return rc
|
return rc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) {
|
func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
// Start event processing pipeline.
|
// Start event processing pipeline.
|
||||||
@ -106,6 +107,8 @@ func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) {
|
|||||||
|
|
||||||
klog.Info("Starting route controller")
|
klog.Info("Starting route controller")
|
||||||
defer klog.Info("Shutting down route controller")
|
defer klog.Info("Shutting down route controller")
|
||||||
|
controllerManagerMetrics.ControllerStarted("route")
|
||||||
|
defer controllerManagerMetrics.ControllerStopped("route")
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("route", ctx.Done(), rc.nodeListerSynced) {
|
if !cache.WaitForNamedCacheSync("route", ctx.Done(), rc.nodeListerSynced) {
|
||||||
return
|
return
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||||
"k8s.io/component-base/featuregate"
|
"k8s.io/component-base/featuregate"
|
||||||
|
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
|
||||||
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
@ -224,7 +225,7 @@ func (c *Controller) enqueueService(obj interface{}) {
|
|||||||
//
|
//
|
||||||
// It's an error to call Run() more than once for a given ServiceController
|
// It's an error to call Run() more than once for a given ServiceController
|
||||||
// object.
|
// object.
|
||||||
func (c *Controller) Run(ctx context.Context, workers int) {
|
func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
|
|
||||||
@ -235,6 +236,8 @@ func (c *Controller) Run(ctx context.Context, workers int) {
|
|||||||
|
|
||||||
klog.Info("Starting service controller")
|
klog.Info("Starting service controller")
|
||||||
defer klog.Info("Shutting down service controller")
|
defer klog.Info("Shutting down service controller")
|
||||||
|
controllerManagerMetrics.ControllerStarted("service")
|
||||||
|
defer controllerManagerMetrics.ControllerStopped("service")
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("service", ctx.Done(), c.serviceListerSynced, c.nodeListerSynced) {
|
if !cache.WaitForNamedCacheSync("service", ctx.Done(), c.serviceListerSynced, c.nodeListerSynced) {
|
||||||
return
|
return
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
servicecontroller "k8s.io/cloud-provider/controllers/service"
|
servicecontroller "k8s.io/cloud-provider/controllers/service"
|
||||||
fakecloud "k8s.io/cloud-provider/fake"
|
fakecloud "k8s.io/cloud-provider/fake"
|
||||||
|
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
@ -252,7 +253,7 @@ func Test_ServiceLoadBalancerEnableLoadBalancerClass(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
informer.Start(ctx.Done())
|
informer.Start(ctx.Done())
|
||||||
go controller.Run(ctx, 1)
|
go controller.Run(ctx, 1, controllersmetrics.NewControllerManagerMetrics("loadbalancer-test"))
|
||||||
|
|
||||||
service := &corev1.Service{
|
service := &corev1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -298,7 +299,7 @@ func Test_SetLoadBalancerClassThenUpdateLoadBalancerClass(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
informer.Start(ctx.Done())
|
informer.Start(ctx.Done())
|
||||||
go controller.Run(ctx, 1)
|
go controller.Run(ctx, 1, controllersmetrics.NewControllerManagerMetrics("loadbalancer-test"))
|
||||||
|
|
||||||
service := &corev1.Service{
|
service := &corev1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -349,7 +350,7 @@ func Test_UpdateLoadBalancerWithLoadBalancerClass(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
informer.Start(ctx.Done())
|
informer.Start(ctx.Done())
|
||||||
go controller.Run(ctx, 1)
|
go controller.Run(ctx, 1, controllersmetrics.NewControllerManagerMetrics("loadbalancer-test"))
|
||||||
|
|
||||||
service := &corev1.Service{
|
service := &corev1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user