Merge pull request #95885 from jiahuif/refactor/controller-manager

refactor: controller manager: InitFunc and base controller interface.
This commit is contained in:
Kubernetes Prow Robot 2021-08-27 15:40:52 -07:00 committed by GitHub
commit cd63952f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 107 additions and 84 deletions

View File

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"net"
"net/http"
"strings"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -31,6 +30,7 @@ import (
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/features"
"k8s.io/klog/v2"
nodeipamcontrolleroptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
@ -59,12 +59,12 @@ func (nodeIpamController *nodeIPAMController) StartNodeIpamControllerWrapper(ini
}
nodeIpamController.nodeIPAMControllerOptions.ApplyTo(&nodeIpamController.nodeIPAMControllerConfiguration)
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startNodeIpamController(initContext, completedConfig, nodeIpamController.nodeIPAMControllerConfiguration, ctx, cloud)
}
}
func startNodeIpamController(initContext app.ControllerInitContext, ccmConfig *cloudcontrollerconfig.CompletedConfig, nodeIPAMConfig nodeipamconfig.NodeIPAMControllerConfiguration, ctx genericcontrollermanager.ControllerContext, cloud cloudprovider.Interface) (http.Handler, bool, error) {
func startNodeIpamController(initContext app.ControllerInitContext, ccmConfig *cloudcontrollerconfig.CompletedConfig, nodeIPAMConfig nodeipamconfig.NodeIPAMControllerConfiguration, ctx genericcontrollermanager.ControllerContext, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
var serviceCIDR *net.IPNet
var secondaryServiceCIDR *net.IPNet

View File

@ -22,17 +22,17 @@ package app
import (
"fmt"
"net/http"
"time"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/pkg/controller/statefulset"
)
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
func startDaemonSetController(ctx ControllerContext) (controller.Interface, bool, error) {
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
@ -48,7 +48,7 @@ func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error)
return nil, true, nil
}
func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
func startStatefulSetController(ctx ControllerContext) (controller.Interface, bool, error) {
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
@ -59,7 +59,7 @@ func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, erro
return nil, true, nil
}
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
func startReplicaSetController(ctx ControllerContext) (controller.Interface, bool, error) {
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
@ -69,7 +69,7 @@ func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error
return nil, true, nil
}
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
func startDeploymentController(ctx ControllerContext) (controller.Interface, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),

View File

@ -21,11 +21,10 @@ limitations under the License.
package app
import (
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
@ -34,7 +33,7 @@ import (
"k8s.io/metrics/pkg/client/external_metrics"
)
func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
func startHPAController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return nil, false, nil
}
@ -42,7 +41,7 @@ func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
return startHPAControllerWithRESTClient(ctx)
}
func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error) {
func startHPAControllerWithRESTClient(ctx ControllerContext) (controller.Interface, bool, error) {
clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
@ -62,7 +61,7 @@ func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool
return startHPAControllerWithMetricsClient(ctx, metricsClient)
}
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")

View File

@ -22,15 +22,15 @@ package app
import (
"fmt"
"net/http"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
kubefeatures "k8s.io/kubernetes/pkg/features"
)
func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
func startJobController(ctx ControllerContext) (controller.Interface, bool, error) {
go job.NewController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
@ -39,7 +39,7 @@ func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
return nil, true, nil
}
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
func startCronJobController(ctx ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {
cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),
ctx.InformerFactory.Batch().V1().CronJobs(),

View File

@ -19,12 +19,11 @@ package app
import (
"fmt"
"net/http"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/bootstrap"
)
func startBootstrapSignerController(ctx ControllerContext) (http.Handler, bool, error) {
func startBootstrapSignerController(ctx ControllerContext) (controller.Interface, bool, error) {
bsc, err := bootstrap.NewSigner(
ctx.ClientBuilder.ClientOrDie("bootstrap-signer"),
ctx.InformerFactory.Core().V1().Secrets(),
@ -38,7 +37,7 @@ func startBootstrapSignerController(ctx ControllerContext) (http.Handler, bool,
return nil, true, nil
}
func startTokenCleanerController(ctx ControllerContext) (http.Handler, bool, error) {
func startTokenCleanerController(ctx ControllerContext) (controller.Interface, bool, error) {
tcc, err := bootstrap.NewTokenCleaner(
ctx.ClientBuilder.ClientOrDie("token-cleaner"),
ctx.InformerFactory.Core().V1().Secrets(),

View File

@ -22,8 +22,8 @@ package app
import (
"fmt"
"net/http"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/certificates/approver"
"k8s.io/kubernetes/pkg/controller/certificates/cleaner"
@ -32,7 +32,7 @@ import (
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
)
func startCSRSigningController(ctx ControllerContext) (http.Handler, bool, error) {
func startCSRSigningController(ctx ControllerContext) (controller.Interface, bool, error) {
missingSingleSigningFile := ctx.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || ctx.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
if missingSingleSigningFile && !anySpecificFilesSet(ctx.ComponentConfig.CSRSigningController) {
klog.V(2).Info("skipping CSR signer controller because no csr cert/key was specified")
@ -147,7 +147,7 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon
return config.ClusterSigningCertFile, config.ClusterSigningKeyFile
}
func startCSRApprovingController(ctx ControllerContext) (http.Handler, bool, error) {
func startCSRApprovingController(ctx ControllerContext) (controller.Interface, bool, error) {
approver := approver.NewCSRApprovingController(
ctx.ClientBuilder.ClientOrDie("certificate-controller"),
ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(),
@ -157,7 +157,7 @@ func startCSRApprovingController(ctx ControllerContext) (http.Handler, bool, err
return nil, true, nil
}
func startCSRCleanerController(ctx ControllerContext) (http.Handler, bool, error) {
func startCSRCleanerController(ctx ControllerContext) (controller.Interface, bool, error) {
cleaner := cleaner.NewCSRCleanerController(
ctx.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(),
@ -166,7 +166,7 @@ func startCSRCleanerController(ctx ControllerContext) (http.Handler, bool, error
return nil, true, nil
}
func startRootCACertPublisher(ctx ControllerContext) (http.Handler, bool, error) {
func startRootCACertPublisher(ctx ControllerContext) (controller.Interface, bool, error) {
var (
rootCA []byte
err error

View File

@ -59,6 +59,7 @@ import (
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration"
@ -262,7 +263,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// Wrap saTokenControllerInitFunc to signal readiness for migration after starting
// the controller.
startSATokenController = func(ctx ControllerContext) (http.Handler, bool, error) {
startSATokenController = func(ctx ControllerContext) (controller.Interface, bool, error) {
defer close(leaderMigrator.MigrationReady)
return saTokenControllerInitFunc(ctx)
}
@ -367,10 +368,14 @@ func (c ControllerContext) IsControllerEnabled(name string) bool {
return genericcontrollermanager.IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
}
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// InitFunc is used to launch a particular controller. It returns a controller
// that can optionally implement other interfaces so that the controller manager
// can support the requested features.
// The returned controller may be nil, which will be considered an anonymous controller
// that requests no additional features from the controller manager.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
type InitFunc func(ctx ControllerContext) (controller controller.Interface, enabled bool, err error)
// ControllerInitializersFunc is used to create a collection of initializers
// given the loopMode.
@ -560,7 +565,7 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
debugHandler, started, err := initFn(ctx)
ctrl, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
@ -569,10 +574,16 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
klog.Warningf("Skipping %q", controllerName)
continue
}
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
if ctrl != nil {
// check if the controller supports and requests a debugHandler
// and it needs the unsecuredMux to mount the handler onto.
if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
}
}
klog.Infof("Started %q", controllerName)
}
@ -587,7 +598,7 @@ type serviceAccountTokenControllerStarter struct {
rootClientBuilder clientbuilder.ControllerClientBuilder
}
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (http.Handler, bool, error) {
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.IsControllerEnabled(saTokenControllerName) {
klog.Warningf("%q is disabled", saTokenControllerName)
return nil, false, nil

View File

@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
@ -41,9 +40,10 @@ import (
routecontroller "k8s.io/cloud-provider/controllers/route"
servicecontroller "k8s.io/cloud-provider/controllers/service"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/controller-manager/controller"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/controller"
pkgcontroller "k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
@ -77,7 +77,7 @@ const (
defaultNodeMaskCIDRIPv6 = 64
)
func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
func startServiceController(ctx ControllerContext) (controller.Interface, bool, error) {
serviceController, err := servicecontroller.New(
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"),
@ -95,7 +95,7 @@ func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
return nil, true, nil
}
func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) {
func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, error) {
var serviceCIDR *net.IPNet
var secondaryServiceCIDR *net.IPNet
@ -192,7 +192,7 @@ func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error)
return nil, true, nil
}
func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
func startNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Core().V1().Pods(),
@ -217,7 +217,7 @@ func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, er
return nil, true, nil
}
func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
ctx.InformerFactory.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
@ -236,7 +236,7 @@ func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, boo
return nil, true, nil
}
func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
func startRouteController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil
@ -281,7 +281,7 @@ func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
return nil, true, nil
}
func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
func startPersistentVolumeBinderController(ctx ControllerContext) (controller.Interface, bool, error) {
plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
if err != nil {
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
@ -314,7 +314,7 @@ func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler,
return nil, true, nil
}
func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, error) {
func startAttachDetachController(ctx ControllerContext) (controller.Interface, bool, error) {
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
}
@ -359,7 +359,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
return nil, true, nil
}
func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, error) {
func startVolumeExpandController(ctx ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
if err != nil {
@ -392,7 +392,7 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err
return nil, false, nil
}
func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool, error) {
func startEphemeralVolumeController(ctx ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
ephemeralController, err := ephemeral.NewController(
ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
@ -407,7 +407,7 @@ func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool,
return nil, false, nil
}
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
func startEndpointController(ctx ControllerContext) (controller.Interface, bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
@ -418,7 +418,7 @@ func startEndpointController(ctx ControllerContext) (http.Handler, bool, error)
return nil, true, nil
}
func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
func startReplicationController(ctx ControllerContext) (controller.Interface, bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
@ -428,7 +428,7 @@ func startReplicationController(ctx ControllerContext) (http.Handler, bool, erro
return nil, true, nil
}
func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
func startPodGCController(ctx ControllerContext) (controller.Interface, bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(),
@ -438,7 +438,7 @@ func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
return nil, true, nil
}
func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, error) {
func startResourceQuotaController(ctx ControllerContext) (controller.Interface, bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaControllerDiscoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
@ -448,7 +448,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: resourceQuotaControllerClient.CoreV1(),
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
InformerFactory: ctx.ObjectOrMetadataInformerFactory,
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
DiscoveryFunc: discoveryFunc,
@ -474,7 +474,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er
return nil, true, nil
}
func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error) {
func startNamespaceController(ctx ControllerContext) (controller.Interface, bool, error) {
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
@ -485,7 +485,7 @@ func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error)
return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig)
}
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (http.Handler, bool, error) {
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil {
@ -507,7 +507,7 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient
return nil, true, nil
}
func startServiceAccountController(ctx ControllerContext) (http.Handler, bool, error) {
func startServiceAccountController(ctx ControllerContext) (controller.Interface, bool, error) {
sac, err := serviceaccountcontroller.NewServiceAccountsController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Namespaces(),
@ -521,7 +521,7 @@ func startServiceAccountController(ctx ControllerContext) (http.Handler, bool, e
return nil, true, nil
}
func startTTLController(ctx ControllerContext) (http.Handler, bool, error) {
func startTTLController(ctx ControllerContext) (controller.Interface, bool, error) {
go ttlcontroller.NewTTLController(
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
@ -529,7 +529,7 @@ func startTTLController(ctx ControllerContext) (http.Handler, bool, error) {
return nil, true, nil
}
func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
func startGarbageCollectorController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return nil, false, nil
}
@ -567,10 +567,10 @@ func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool,
// the garbage collector.
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Stop)
return garbagecollector.NewDebugHandler(garbageCollector), true, nil
return garbageCollector, true, nil
}
func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
func startPVCProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(),
@ -585,7 +585,7 @@ func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, er
return nil, true, nil
}
func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
func startPVProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
go pvprotection.NewPVProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
@ -594,7 +594,7 @@ func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, err
return nil, true, nil
}
func startTTLAfterFinishedController(ctx ControllerContext) (http.Handler, bool, error) {
func startTTLAfterFinishedController(ctx ControllerContext) (controller.Interface, bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
return nil, false, nil
}
@ -672,7 +672,7 @@ func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6
return nodeMaskCIDRs
}
func startStorageVersionGCController(ctx ControllerContext) (http.Handler, bool, error) {
func startStorageVersionGCController(ctx ControllerContext) (controller.Interface, bool, error) {
go storageversiongc.NewStorageVersionGC(
ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
ctx.InformerFactory.Coordination().V1().Leases(),

View File

@ -17,7 +17,6 @@ limitations under the License.
package app
import (
"net/http"
"testing"
"time"
@ -28,6 +27,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/controller-manager/controller"
)
// TestClientBuilder inherits ClientBuilder and can accept a given fake clientset.
@ -104,7 +104,7 @@ func possibleDiscoveryResource() []*metav1.APIResourceList {
}
}
type controllerInitFunc func(ControllerContext) (http.Handler, bool, error)
type controllerInitFunc func(ControllerContext) (controller.Interface, bool, error)
func TestController_DiscoveryError(t *testing.T) {
controllerInitFuncMap := map[string]controllerInitFunc{

View File

@ -21,13 +21,12 @@ limitations under the License.
package app
import (
"net/http"
"k8s.io/controller-manager/controller"
endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice"
endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
)
func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, error) {
func startEndpointSliceController(ctx ControllerContext) (controller.Interface, bool, error) {
go endpointslicecontroller.NewController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
@ -40,7 +39,7 @@ func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, er
return nil, true, nil
}
func startEndpointSliceMirroringController(ctx ControllerContext) (http.Handler, bool, error) {
func startEndpointSliceMirroringController(ctx ControllerContext) (controller.Interface, bool, error) {
go endpointslicemirroringcontroller.NewController(
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.InformerFactory.Discovery().V1().EndpointSlices(),

View File

@ -21,18 +21,17 @@ limitations under the License.
package app
import (
"net/http"
"k8s.io/klog/v2"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/disruption"
kubefeatures "k8s.io/kubernetes/pkg/features"
)
func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error) {
func startDisruptionController(ctx ControllerContext) (controller.Interface, bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
klog.InfoS("Refusing to start disruption because the PodDisruptionBudget feature is disabled")
return nil, false, nil

View File

@ -17,13 +17,12 @@ limitations under the License.
package app
import (
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
)
func startClusterRoleAggregrationController(ctx ControllerContext) (http.Handler, bool, error) {
func startClusterRoleAggregrationController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
return nil, false, nil
}

View File

@ -281,3 +281,7 @@ func (h *debugHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Write(data)
w.WriteHeader(http.StatusOK)
}
func (gc *GarbageCollector) DebuggingHandler() http.Handler {
return NewDebugHandler(gc)
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/kubernetes/pkg/controller/apis/config/scheme"
@ -78,6 +79,9 @@ type GarbageCollector struct {
workerLock sync.RWMutex
}
var _ controller.Interface = (*GarbageCollector)(nil)
var _ controller.Debuggable = (*GarbageCollector)(nil)
// NewGarbageCollector creates a new GarbageCollector.
func NewGarbageCollector(
kubeClient clientset.Interface,
@ -726,3 +730,7 @@ func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) m
return deletableGroupVersionResources
}
func (gc *GarbageCollector) Name() string {
return "garbagecollector"
}

View File

@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand"
"net/http"
"os"
"time"
@ -50,6 +49,7 @@ import (
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration"
@ -302,10 +302,14 @@ func startControllers(cloud cloudprovider.Interface, ctx genericcontrollermanage
// InitCloudFunc is used to initialize cloud
type InitCloudFunc func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// InitFunc is used to launch a particular controller. It returns a controller
// that can optionally implement other interfaces so that the controller manager
// can support the requested features.
// The returned controller may be nil, which will be considered an anonymous controller
// that requests no additional features from the controller manager.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx genericcontrollermanager.ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
type InitFunc func(ctx genericcontrollermanager.ControllerContext) (controller controller.Interface, enabled bool, err error)
// InitFuncConstructor is used to construct InitFunc
type InitFuncConstructor func(initcontext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc
@ -340,28 +344,28 @@ type ControllerInitContext struct {
// StartCloudNodeControllerWrapper is used to take cloud cofig as input and start cloud node controller
func StartCloudNodeControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startCloudNodeController(initContext, completedConfig, cloud, ctx.Stop)
}
}
// StartCloudNodeLifecycleControllerWrapper is used to take cloud cofig as input and start cloud node lifecycle controller
func StartCloudNodeLifecycleControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startCloudNodeLifecycleController(initContext, completedConfig, cloud, ctx.Stop)
}
}
// StartServiceControllerWrapper is used to take cloud cofig as input and start service controller
func StartServiceControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startServiceController(initContext, completedConfig, cloud, ctx.Stop)
}
}
// StartRouteControllerWrapper is used to take cloud cofig as input and start route controller
func StartRouteControllerWrapper(initContext ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) InitFunc {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startRouteController(initContext, completedConfig, cloud, ctx.Stop)
}
}

View File

@ -23,7 +23,6 @@ package app
import (
"fmt"
"net"
"net/http"
"strings"
cloudprovider "k8s.io/cloud-provider"
@ -32,6 +31,7 @@ import (
cloudnodelifecyclecontroller "k8s.io/cloud-provider/controllers/nodelifecycle"
routecontroller "k8s.io/cloud-provider/controllers/route"
servicecontroller "k8s.io/cloud-provider/controllers/service"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/features"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
@ -39,7 +39,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
func startCloudNodeController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startCloudNodeController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (controller.Interface, bool, error) {
// Start the CloudNodeController
nodeController, err := cloudnodecontroller.NewCloudNodeController(
ctx.SharedInformers.Core().V1().Nodes(),
@ -58,7 +58,7 @@ func startCloudNodeController(initContext ControllerInitContext, ctx *config.Com
return nil, true, nil
}
func startCloudNodeLifecycleController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startCloudNodeLifecycleController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (controller.Interface, bool, error) {
// Start the cloudNodeLifecycleController
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
ctx.SharedInformers.Core().V1().Nodes(),
@ -77,7 +77,7 @@ func startCloudNodeLifecycleController(initContext ControllerInitContext, ctx *c
return nil, true, nil
}
func startServiceController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startServiceController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (controller.Interface, bool, error) {
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
@ -98,7 +98,7 @@ func startServiceController(initContext ControllerInitContext, ctx *config.Compl
return nil, true, nil
}
func startRouteController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startRouteController(initContext ControllerInitContext, ctx *config.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (controller.Interface, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes, --configure-cloud-routes: %v", ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil

1
vendor/modules.txt vendored
View File

@ -1969,6 +1969,7 @@ k8s.io/controller-manager/app
k8s.io/controller-manager/config
k8s.io/controller-manager/config/v1alpha1
k8s.io/controller-manager/config/v1beta1
k8s.io/controller-manager/controller
k8s.io/controller-manager/options
k8s.io/controller-manager/pkg/clientbuilder
k8s.io/controller-manager/pkg/features