mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 23:47:50 +00:00
Update controller initializer funcs to take Context
This commit is contained in:
@@ -21,6 +21,7 @@ limitations under the License.
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -77,13 +78,13 @@ const (
|
||||
defaultNodeMaskCIDRIPv6 = 64
|
||||
)
|
||||
|
||||
func startServiceController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startServiceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
serviceController, err := servicecontroller.New(
|
||||
ctx.Cloud,
|
||||
ctx.ClientBuilder.ClientOrDie("service-controller"),
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
controllerContext.Cloud,
|
||||
controllerContext.ClientBuilder.ClientOrDie("service-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Services(),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
utilfeature.DefaultFeatureGate,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -91,21 +92,21 @@ func startServiceController(ctx ControllerContext) (controller.Interface, bool,
|
||||
klog.Errorf("Failed to start service controller: %v", err)
|
||||
return nil, false, nil
|
||||
}
|
||||
go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
|
||||
go serviceController.Run(controllerContext.Stop, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
var serviceCIDR *net.IPNet
|
||||
var secondaryServiceCIDR *net.IPNet
|
||||
|
||||
// should we start nodeIPAM
|
||||
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
|
||||
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// failure: bad cidrs in config
|
||||
clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@@ -121,17 +122,17 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool,
|
||||
}
|
||||
|
||||
// service cidr processing
|
||||
if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
|
||||
_, serviceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)
|
||||
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
|
||||
_, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)
|
||||
if err != nil {
|
||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
|
||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
|
||||
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
|
||||
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
|
||||
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
|
||||
if err != nil {
|
||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
|
||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,60 +150,60 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool,
|
||||
|
||||
// only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters.
|
||||
// --node-cidr-mask-size flag is incompatible with dual stack clusters.
|
||||
nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(ctx.ComponentConfig.NodeIPAMController, clusterCIDRs)
|
||||
nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(controllerContext.ComponentConfig.NodeIPAMController, clusterCIDRs)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.Cloud,
|
||||
ctx.ClientBuilder.ClientOrDie("node-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.Cloud,
|
||||
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
|
||||
clusterCIDRs,
|
||||
serviceCIDR,
|
||||
secondaryServiceCIDR,
|
||||
nodeCIDRMaskSizes,
|
||||
ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
|
||||
ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
go nodeIpamController.Run(ctx.Stop)
|
||||
go nodeIpamController.Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
|
||||
ctx.InformerFactory.Coordination().V1().Leases(),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.InformerFactory.Apps().V1().DaemonSets(),
|
||||
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
|
||||
// node lifecycle controller uses existing cluster role from node-controller
|
||||
ctx.ClientBuilder.ClientOrDie("node-controller"),
|
||||
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
|
||||
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
|
||||
ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
|
||||
ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
|
||||
ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
|
||||
ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
|
||||
ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
|
||||
ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
|
||||
ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
|
||||
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
|
||||
controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
|
||||
controllerContext.ComponentConfig.NodeLifecycleController.EnableTaintManager,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
go lifecycleController.Run(ctx.Stop)
|
||||
go lifecycleController.Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
// cloud node lifecycle controller uses existing cluster role from node-controller
|
||||
ctx.ClientBuilder.ClientOrDie("node-controller"),
|
||||
ctx.Cloud,
|
||||
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
|
||||
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
|
||||
controllerContext.Cloud,
|
||||
controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
|
||||
)
|
||||
if err != nil {
|
||||
// the controller manager should continue to run if the "Instances" interface is not
|
||||
@@ -211,27 +212,27 @@ func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interf
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
go cloudNodeLifecycleController.Run(ctx.Stop)
|
||||
go cloudNodeLifecycleController.Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startRouteController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||
func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||
return nil, false, nil
|
||||
}
|
||||
if ctx.Cloud == nil {
|
||||
if controllerContext.Cloud == nil {
|
||||
klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
||||
return nil, false, nil
|
||||
}
|
||||
routes, ok := ctx.Cloud.Routes()
|
||||
routes, ok := controllerContext.Cloud.Routes()
|
||||
if !ok {
|
||||
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// failure: bad cidrs in config
|
||||
clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@@ -247,48 +248,48 @@ func startRouteController(ctx ControllerContext) (controller.Interface, bool, er
|
||||
}
|
||||
|
||||
routeController := routecontroller.New(routes,
|
||||
ctx.ClientBuilder.ClientOrDie("route-controller"),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
controllerContext.ClientBuilder.ClientOrDie("route-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
clusterCIDRs)
|
||||
go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
||||
go routeController.Run(controllerContext.Stop, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPersistentVolumeBinderController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
plugins, err := ProbeControllerVolumePlugins(controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
|
||||
}
|
||||
filteredDialOptions, err := options.ParseVolumeHostFilters(
|
||||
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
|
||||
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
|
||||
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
|
||||
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
params := persistentvolumecontroller.ControllerParameters{
|
||||
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
|
||||
SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
|
||||
KubeClient: controllerContext.ClientBuilder.ClientOrDie("persistent-volume-binder"),
|
||||
SyncPeriod: controllerContext.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
|
||||
VolumePlugins: plugins,
|
||||
Cloud: ctx.Cloud,
|
||||
ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
|
||||
PodInformer: ctx.InformerFactory.Core().V1().Pods(),
|
||||
NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),
|
||||
EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
|
||||
Cloud: controllerContext.Cloud,
|
||||
ClusterName: controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
VolumeInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: controllerContext.InformerFactory.Storage().V1().StorageClasses(),
|
||||
PodInformer: controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
NodeInformer: controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
|
||||
FilteredDialOptions: filteredDialOptions,
|
||||
}
|
||||
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
|
||||
if volumeControllerErr != nil {
|
||||
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
|
||||
}
|
||||
go volumeController.Run(ctx.Stop)
|
||||
go volumeController.Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startAttachDetachController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startAttachDetachController(_ context.Context, ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
|
||||
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
|
||||
}
|
||||
@@ -333,24 +334,24 @@ func startAttachDetachController(ctx ControllerContext) (controller.Interface, b
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startVolumeExpandController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
|
||||
plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
plugins, err := ProbeExpandableVolumePlugins(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
|
||||
}
|
||||
csiTranslator := csitrans.New()
|
||||
filteredDialOptions, err := options.ParseVolumeHostFilters(
|
||||
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
|
||||
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
|
||||
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
|
||||
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
expandController, expandControllerErr := expand.NewExpandController(
|
||||
ctx.ClientBuilder.ClientOrDie("expand-controller"),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ctx.Cloud,
|
||||
controllerContext.ClientBuilder.ClientOrDie("expand-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
controllerContext.Cloud,
|
||||
plugins,
|
||||
csiTranslator,
|
||||
csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate),
|
||||
@@ -360,74 +361,74 @@ func startVolumeExpandController(ctx ControllerContext) (controller.Interface, b
|
||||
if expandControllerErr != nil {
|
||||
return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
|
||||
}
|
||||
go expandController.Run(ctx.Stop)
|
||||
go expandController.Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func startEphemeralVolumeController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
|
||||
ephemeralController, err := ephemeral.NewController(
|
||||
ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims())
|
||||
controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims())
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
|
||||
}
|
||||
go ephemeralController.Run(int(ctx.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Stop)
|
||||
go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func startEndpointController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) {
|
||||
go endpointcontroller.NewEndpointController(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.InformerFactory.Core().V1().Endpoints(),
|
||||
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
|
||||
ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
|
||||
).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
|
||||
controllerCtx.InformerFactory.Core().V1().Pods(),
|
||||
controllerCtx.InformerFactory.Core().V1().Services(),
|
||||
controllerCtx.InformerFactory.Core().V1().Endpoints(),
|
||||
controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"),
|
||||
controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
|
||||
).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), controllerCtx.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startReplicationController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go replicationcontroller.NewReplicationManager(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
ctx.ClientBuilder.ClientOrDie("replication-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
|
||||
replicationcontroller.BurstReplicas,
|
||||
).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
|
||||
).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPodGCController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startPodGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go podgc.NewPodGC(
|
||||
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
|
||||
).Run(ctx.Stop)
|
||||
controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
|
||||
).Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startResourceQuotaController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||
resourceQuotaControllerDiscoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
||||
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
||||
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
||||
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource)
|
||||
listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource)
|
||||
quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
|
||||
|
||||
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
|
||||
QuotaClient: resourceQuotaControllerClient.CoreV1(),
|
||||
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
|
||||
InformerFactory: ctx.ObjectOrMetadataInformerFactory,
|
||||
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
|
||||
ResourceQuotaInformer: controllerContext.InformerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(controllerContext.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
|
||||
InformerFactory: controllerContext.ObjectOrMetadataInformerFactory,
|
||||
ReplenishmentResyncPeriod: controllerContext.ResyncPeriod,
|
||||
DiscoveryFunc: discoveryFunc,
|
||||
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
|
||||
InformersStarted: ctx.InformersStarted,
|
||||
InformersStarted: controllerContext.InformersStarted,
|
||||
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
|
||||
}
|
||||
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
@@ -440,23 +441,23 @@ func startResourceQuotaController(ctx ControllerContext) (controller.Interface,
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop)
|
||||
go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), controllerContext.Stop)
|
||||
|
||||
// Periodically the quota controller to detect new resource types
|
||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop)
|
||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, controllerContext.Stop)
|
||||
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startNamespaceController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startNamespaceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
|
||||
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
|
||||
// including events), takes ~10 seconds by default.
|
||||
nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller")
|
||||
nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
|
||||
nsKubeconfig.QPS *= 20
|
||||
nsKubeconfig.Burst *= 100
|
||||
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
|
||||
return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig)
|
||||
return startModifiedNamespaceController(controllerContext, namespaceKubeClient, nsKubeconfig)
|
||||
}
|
||||
|
||||
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
|
||||
@@ -481,37 +482,37 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startServiceAccountController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startServiceAccountController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
sac, err := serviceaccountcontroller.NewServiceAccountsController(
|
||||
ctx.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
ctx.InformerFactory.Core().V1().Namespaces(),
|
||||
ctx.ClientBuilder.ClientOrDie("service-account-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
controllerContext.InformerFactory.Core().V1().Namespaces(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("service-account-controller"),
|
||||
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
|
||||
}
|
||||
go sac.Run(1, ctx.Stop)
|
||||
go sac.Run(1, controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startTTLController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go ttlcontroller.NewTTLController(
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
|
||||
).Run(5, ctx.Stop)
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("ttl-controller"),
|
||||
).Run(5, controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startGarbageCollectorController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
|
||||
func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||
discoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
|
||||
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
|
||||
|
||||
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
|
||||
config := controllerContext.ClientBuilder.ConfigOrDie("generic-garbage-collector")
|
||||
// Increase garbage collector controller's throughput: each object deletion takes two API calls,
|
||||
// so to get |config.QPS| deletion rate we need to allow 2x more requests for this controller.
|
||||
config.QPS *= 2
|
||||
@@ -521,64 +522,64 @@ func startGarbageCollectorController(ctx ControllerContext) (controller.Interfac
|
||||
}
|
||||
|
||||
ignoredResources := make(map[schema.GroupResource]struct{})
|
||||
for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
|
||||
for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
|
||||
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
|
||||
}
|
||||
garbageCollector, err := garbagecollector.NewGarbageCollector(
|
||||
gcClientset,
|
||||
metadataClient,
|
||||
ctx.RESTMapper,
|
||||
controllerContext.RESTMapper,
|
||||
ignoredResources,
|
||||
ctx.ObjectOrMetadataInformerFactory,
|
||||
ctx.InformersStarted,
|
||||
controllerContext.ObjectOrMetadataInformerFactory,
|
||||
controllerContext.InformersStarted,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
|
||||
}
|
||||
|
||||
// Start the garbage collector.
|
||||
workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
|
||||
go garbageCollector.Run(workers, ctx.Stop)
|
||||
workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
|
||||
go garbageCollector.Run(workers, controllerContext.Stop)
|
||||
|
||||
// Periodically refresh the RESTMapper with new discovery information and sync
|
||||
// the garbage collector.
|
||||
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Stop)
|
||||
go garbageCollector.Sync(discoveryClient, 30*time.Second, controllerContext.Stop)
|
||||
|
||||
return garbageCollector, true, nil
|
||||
}
|
||||
|
||||
func startPVCProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("pvc-protection-controller"),
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
|
||||
}
|
||||
go pvcProtectionController.Run(1, ctx.Stop)
|
||||
go pvcProtectionController.Run(1, controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPVProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go pvprotection.NewPVProtectionController(
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"),
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
|
||||
).Run(1, ctx.Stop)
|
||||
).Run(1, controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startTTLAfterFinishedController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
|
||||
return nil, false, nil
|
||||
}
|
||||
go ttlafterfinished.New(
|
||||
ctx.InformerFactory.Batch().V1().Jobs(),
|
||||
ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
|
||||
).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop)
|
||||
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
|
||||
).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
@@ -674,11 +675,12 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
|
||||
}
|
||||
return sortedSizes(ipv4Mask, ipv6Mask), nil
|
||||
}
|
||||
func startStorageVersionGCController(ctx ControllerContext) (controller.Interface, bool, error) {
|
||||
|
||||
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
go storageversiongc.NewStorageVersionGC(
|
||||
ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||
ctx.InformerFactory.Coordination().V1().Leases(),
|
||||
ctx.InformerFactory.Internal().V1alpha1().StorageVersions(),
|
||||
).Run(ctx.Stop)
|
||||
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
||||
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
|
||||
).Run(controllerContext.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user