add debug handler capability for individual controllers

This commit is contained in:
David Eads 2018-07-25 15:45:54 -04:00
parent 4623ebd9ff
commit fb7d137ea2
12 changed files with 153 additions and 126 deletions

View File

@ -110,15 +110,15 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
// Start the controller manager HTTP server // Start the controller manager HTTP server
stopCh := make(chan struct{}) stopCh := make(chan struct{})
if c.SecureServing != nil { if c.SecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging) unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }
} }
if c.InsecureServing != nil { if c.InsecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging) unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }

View File

@ -48,7 +48,7 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
} }
// NewBaseHandler takes in CompletedConfig and returns a handler. // NewBaseHandler takes in CompletedConfig and returns a handler.
func NewBaseHandler(c *componentconfig.DebuggingConfiguration) http.Handler { func NewBaseHandler(c *componentconfig.DebuggingConfiguration) *mux.PathRecorderMux {
mux := mux.NewPathRecorderMux("controller-manager") mux := mux.NewPathRecorderMux("controller-manager")
healthz.InstallHandler(mux) healthz.InstallHandler(mux)
if c.EnableProfiling { if c.EnableProfiling {

View File

@ -111,6 +111,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library",

View File

@ -23,6 +23,8 @@ package app
import ( import (
"fmt" "fmt"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/deployment"
@ -30,9 +32,9 @@ import (
"k8s.io/kubernetes/pkg/controller/statefulset" "k8s.io/kubernetes/pkg/controller/statefulset"
) )
func startDaemonSetController(ctx ControllerContext) (bool, error) { func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
return false, nil return nil, false, nil
} }
dsc, err := daemon.NewDaemonSetsController( dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(), ctx.InformerFactory.Apps().V1().DaemonSets(),
@ -42,15 +44,15 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) {
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating DaemonSets controller: %v", err) return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
} }
go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop) go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startStatefulSetController(ctx ControllerContext) (bool, error) { func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
return false, nil return nil, false, nil
} }
go statefulset.NewStatefulSetController( go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
@ -59,12 +61,12 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) {
ctx.InformerFactory.Apps().V1().ControllerRevisions(), ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"), ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(1, ctx.Stop) ).Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }
func startReplicaSetController(ctx ControllerContext) (bool, error) { func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return false, nil return nil, false, nil
} }
go replicaset.NewReplicaSetController( go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Apps().V1().ReplicaSets(),
@ -72,12 +74,12 @@ func startReplicaSetController(ctx ControllerContext) (bool, error) {
ctx.ClientBuilder.ClientOrDie("replicaset-controller"), ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas, replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startDeploymentController(ctx ControllerContext) (bool, error) { func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return false, nil return nil, false, nil
} }
dc, err := deployment.NewDeploymentController( dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(), ctx.InformerFactory.Apps().V1().Deployments(),
@ -86,8 +88,8 @@ func startDeploymentController(ctx ControllerContext) (bool, error) {
ctx.ClientBuilder.ClientOrDie("deployment-controller"), ctx.ClientBuilder.ClientOrDie("deployment-controller"),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating Deployment controller: %v", err) return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
} }
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop) go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -21,6 +21,8 @@ limitations under the License.
package app package app
import ( import (
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/scale" "k8s.io/client-go/scale"
@ -31,9 +33,9 @@ import (
"k8s.io/metrics/pkg/client/external_metrics" "k8s.io/metrics/pkg/client/external_metrics"
) )
func startHPAController(ctx ControllerContext) (bool, error) { func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return false, nil return nil, false, nil
} }
if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients { if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients {
@ -44,7 +46,7 @@ func startHPAController(ctx ControllerContext) (bool, error) {
return startHPAControllerWithLegacyClient(ctx) return startHPAControllerWithLegacyClient(ctx)
} }
func startHPAControllerWithRESTClient(ctx ControllerContext) (bool, error) { func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error) {
clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewRESTMetricsClient( metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig), resourceclient.NewForConfigOrDie(clientConfig),
@ -54,7 +56,7 @@ func startHPAControllerWithRESTClient(ctx ControllerContext) (bool, error) {
return startHPAControllerWithMetricsClient(ctx, metricsClient) return startHPAControllerWithMetricsClient(ctx, metricsClient)
} }
func startHPAControllerWithLegacyClient(ctx ControllerContext) (bool, error) { func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewHeapsterMetricsClient( metricsClient := metrics.NewHeapsterMetricsClient(
hpaClient, hpaClient,
@ -66,7 +68,7 @@ func startHPAControllerWithLegacyClient(ctx ControllerContext) (bool, error) {
return startHPAControllerWithMetricsClient(ctx, metricsClient) return startHPAControllerWithMetricsClient(ctx, metricsClient)
} }
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (bool, error) { func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
@ -75,7 +77,7 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil { if err != nil {
return false, err return nil, false, err
} }
replicaCalc := podautoscaler.NewReplicaCalculator( replicaCalc := podautoscaler.NewReplicaCalculator(
@ -94,5 +96,5 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUpscaleForbiddenWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUpscaleForbiddenWindow.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleForbiddenWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleForbiddenWindow.Duration,
).Run(ctx.Stop) ).Run(ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -23,33 +23,35 @@ package app
import ( import (
"fmt" "fmt"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
) )
func startJobController(ctx ControllerContext) (bool, error) { func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] {
return false, nil return nil, false, nil
} }
go job.NewJobController( go job.NewJobController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(), ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"), ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop) ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startCronJobController(ctx ControllerContext) (bool, error) { func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
return false, nil return nil, false, nil
} }
cjc, err := cronjob.NewCronJobController( cjc, err := cronjob.NewCronJobController(
ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating CronJob controller: %v", err) return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
} }
go cjc.Run(ctx.Stop) go cjc.Run(ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -19,10 +19,12 @@ package app
import ( import (
"fmt" "fmt"
"net/http"
"k8s.io/kubernetes/pkg/controller/bootstrap" "k8s.io/kubernetes/pkg/controller/bootstrap"
) )
func startBootstrapSignerController(ctx ControllerContext) (bool, error) { func startBootstrapSignerController(ctx ControllerContext) (http.Handler, bool, error) {
bsc, err := bootstrap.NewBootstrapSigner( bsc, err := bootstrap.NewBootstrapSigner(
ctx.ClientBuilder.ClientOrDie("bootstrap-signer"), ctx.ClientBuilder.ClientOrDie("bootstrap-signer"),
ctx.InformerFactory.Core().V1().Secrets(), ctx.InformerFactory.Core().V1().Secrets(),
@ -30,21 +32,21 @@ func startBootstrapSignerController(ctx ControllerContext) (bool, error) {
bootstrap.DefaultBootstrapSignerOptions(), bootstrap.DefaultBootstrapSignerOptions(),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating BootstrapSigner controller: %v", err) return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
} }
go bsc.Run(ctx.Stop) go bsc.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startTokenCleanerController(ctx ControllerContext) (bool, error) { func startTokenCleanerController(ctx ControllerContext) (http.Handler, bool, error) {
tcc, err := bootstrap.NewTokenCleaner( tcc, err := bootstrap.NewTokenCleaner(
ctx.ClientBuilder.ClientOrDie("token-cleaner"), ctx.ClientBuilder.ClientOrDie("token-cleaner"),
ctx.InformerFactory.Core().V1().Secrets(), ctx.InformerFactory.Core().V1().Secrets(),
bootstrap.DefaultTokenCleanerOptions(), bootstrap.DefaultTokenCleanerOptions(),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating TokenCleaner controller: %v", err) return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
} }
go tcc.Run(ctx.Stop) go tcc.Run(ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -26,6 +26,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
kubeoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" kubeoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/controller/certificates/approver" "k8s.io/kubernetes/pkg/controller/certificates/approver"
@ -33,12 +35,12 @@ import (
"k8s.io/kubernetes/pkg/controller/certificates/signer" "k8s.io/kubernetes/pkg/controller/certificates/signer"
) )
func startCSRSigningController(ctx ControllerContext) (bool, error) { func startCSRSigningController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil return nil, false, nil
} }
if ctx.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || ctx.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == "" { if ctx.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || ctx.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == "" {
return false, nil return nil, false, nil
} }
// Deprecation warning for old defaults. // Deprecation warning for old defaults.
@ -72,7 +74,7 @@ func startCSRSigningController(ctx ControllerContext) (bool, error) {
// setting up the signing controller. This isn't // setting up the signing controller. This isn't
// actually a problem since the signer is not a // actually a problem since the signer is not a
// required controller. // required controller.
return false, nil return nil, false, nil
default: default:
// Note that '!filesExist && !usesDefaults' is obviously // Note that '!filesExist && !usesDefaults' is obviously
// operator error. We don't handle this case here and instead // operator error. We don't handle this case here and instead
@ -89,16 +91,16 @@ func startCSRSigningController(ctx ControllerContext) (bool, error) {
ctx.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration, ctx.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration,
) )
if err != nil { if err != nil {
return false, fmt.Errorf("failed to start certificate controller: %v", err) return nil, false, fmt.Errorf("failed to start certificate controller: %v", err)
} }
go signer.Run(1, ctx.Stop) go signer.Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }
func startCSRApprovingController(ctx ControllerContext) (bool, error) { func startCSRApprovingController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil return nil, false, nil
} }
approver := approver.NewCSRApprovingController( approver := approver.NewCSRApprovingController(
@ -107,14 +109,14 @@ func startCSRApprovingController(ctx ControllerContext) (bool, error) {
) )
go approver.Run(1, ctx.Stop) go approver.Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }
func startCSRCleanerController(ctx ControllerContext) (bool, error) { func startCSRCleanerController(ctx ControllerContext) (http.Handler, bool, error) {
cleaner := cleaner.NewCSRCleanerController( cleaner := cleaner.NewCSRCleanerController(
ctx.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1beta1().CertificateSigningRequests(), ctx.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1beta1().CertificateSigningRequests(),
ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(), ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
) )
go cleaner.Run(1, ctx.Stop) go cleaner.Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -31,11 +31,14 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/mux"
cacheddiscovery "k8s.io/client-go/discovery/cached" cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -129,16 +132,18 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
} }
// Start the controller manager HTTP server // Start the controller manager HTTP server
// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
var unsecuredMux *mux.PathRecorderMux
if c.SecureServing != nil { if c.SecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging) unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }
} }
if c.InsecureServing != nil { if c.InsecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging) unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }
@ -170,7 +175,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
} }
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode)); err != nil { if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
glog.Fatalf("error starting controllers: %v", err) glog.Fatalf("error starting controllers: %v", err)
} }
@ -291,7 +296,7 @@ func IsControllerEnabled(name string, disabledByDefaultControllers sets.String,
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks". // InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal` // Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled. // The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (bool, error) type InitFunc func(ctx ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
func KnownControllers() []string { func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops)) ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
@ -434,10 +439,10 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
return ctx, nil return ctx, nil
} }
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error { func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials. // If this fails, just return here and fail since other controllers won't be able to get credentials.
if _, err := startSATokenController(ctx); err != nil { if _, _, err := startSATokenController(ctx); err != nil {
return err return err
} }
@ -456,7 +461,7 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName) glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx) debugHandler, started, err := initFn(ctx)
if err != nil { if err != nil {
glog.Errorf("Error starting %q", controllerName) glog.Errorf("Error starting %q", controllerName)
return err return err
@ -465,6 +470,11 @@ func StartControllers(ctx ControllerContext, startSATokenController InitFunc, co
glog.Warningf("Skipping %q", controllerName) glog.Warningf("Skipping %q", controllerName)
continue continue
} }
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
glog.Infof("Started %q", controllerName) glog.Infof("Started %q", controllerName)
} }
@ -478,29 +488,29 @@ type serviceAccountTokenControllerStarter struct {
rootClientBuilder controller.ControllerClientBuilder rootClientBuilder controller.ControllerClientBuilder
} }
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) { func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.IsControllerEnabled(saTokenControllerName) { if !ctx.IsControllerEnabled(saTokenControllerName) {
glog.Warningf("%q is disabled", saTokenControllerName) glog.Warningf("%q is disabled", saTokenControllerName)
return false, nil return nil, false, nil
} }
if len(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 { if len(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
glog.Warningf("%q is disabled because there is no private key", saTokenControllerName) glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
return false, nil return nil, false, nil
} }
privateKey, err := certutil.PrivateKeyFromFile(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) privateKey, err := certutil.PrivateKeyFromFile(ctx.ComponentConfig.SAController.ServiceAccountKeyFile)
if err != nil { if err != nil {
return true, fmt.Errorf("error reading key for service account token controller: %v", err) return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err)
} }
var rootCA []byte var rootCA []byte
if ctx.ComponentConfig.SAController.RootCAFile != "" { if ctx.ComponentConfig.SAController.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(ctx.ComponentConfig.SAController.RootCAFile) rootCA, err = ioutil.ReadFile(ctx.ComponentConfig.SAController.RootCAFile)
if err != nil { if err != nil {
return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err) return nil, true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err)
} }
if _, err := certutil.ParseCertsPEM(rootCA); err != nil { if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err) return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err)
} }
} else { } else {
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
@ -516,12 +526,12 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
}, },
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating Tokens controller: %v", err) return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
} }
go controller.Run(int(ctx.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Stop) go controller.Run(int(ctx.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Stop)
// start the first set of informers now so that other controllers can start // start the first set of informers now so that other controllers can start
ctx.InformerFactory.Start(ctx.Stop) ctx.InformerFactory.Start(ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -28,6 +28,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"net/http"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -59,7 +61,7 @@ import (
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
) )
func startServiceController(ctx ControllerContext) (bool, error) { func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
serviceController, err := servicecontroller.New( serviceController, err := servicecontroller.New(
ctx.Cloud, ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"), ctx.ClientBuilder.ClientOrDie("service-controller"),
@ -70,18 +72,18 @@ func startServiceController(ctx ControllerContext) (bool, error) {
if err != nil { if err != nil {
// This error shouldn't fail. It lives like this as a legacy. // This error shouldn't fail. It lives like this as a legacy.
glog.Errorf("Failed to start service controller: %v", err) glog.Errorf("Failed to start service controller: %v", err)
return false, nil return nil, false, nil
} }
go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
return true, nil return nil, true, nil
} }
func startNodeIpamController(ctx ControllerContext) (bool, error) { func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) {
var clusterCIDR *net.IPNet = nil var clusterCIDR *net.IPNet = nil
var serviceCIDR *net.IPNet = nil var serviceCIDR *net.IPNet = nil
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return false, nil return nil, false, nil
} }
var err error var err error
@ -109,13 +111,13 @@ func startNodeIpamController(ctx ControllerContext) (bool, error) {
ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType), ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
) )
if err != nil { if err != nil {
return true, err return nil, true, err
} }
go nodeIpamController.Run(ctx.Stop) go nodeIpamController.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startNodeLifecycleController(ctx ControllerContext) (bool, error) { func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().Nodes(),
@ -135,25 +137,25 @@ func startNodeLifecycleController(ctx ControllerContext) (bool, error) {
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition), utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
) )
if err != nil { if err != nil {
return true, err return nil, true, err
} }
go lifecycleController.Run(ctx.Stop) go lifecycleController.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startRouteController(ctx ControllerContext) (bool, error) { func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return false, nil return nil, false, nil
} }
if ctx.Cloud == nil { if ctx.Cloud == nil {
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.") glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
return false, nil return nil, false, nil
} }
routes, ok := ctx.Cloud.Routes() routes, ok := ctx.Cloud.Routes()
if !ok { if !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
return false, nil return nil, false, nil
} }
_, clusterCIDR, err := net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) _, clusterCIDR, err := net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil { if err != nil {
@ -161,10 +163,10 @@ func startRouteController(ctx ControllerContext) (bool, error) {
} }
routeController := routecontroller.New(routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.InformerFactory.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR) routeController := routecontroller.New(routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.InformerFactory.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
return true, nil return nil, true, nil
} }
func startPersistentVolumeBinderController(ctx ControllerContext) (bool, error) { func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
params := persistentvolumecontroller.ControllerParameters{ params := persistentvolumecontroller.ControllerParameters{
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"), KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration, SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
@ -180,15 +182,15 @@ func startPersistentVolumeBinderController(ctx ControllerContext) (bool, error)
} }
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params) volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil { if volumeControllerErr != nil {
return true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
} }
go volumeController.Run(ctx.Stop) go volumeController.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startAttachDetachController(ctx ControllerContext) (bool, error) { func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, error) {
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
return true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.") return nil, true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
} }
attachDetachController, attachDetachControllerErr := attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController( attachdetach.NewAttachDetachController(
@ -205,13 +207,13 @@ func startAttachDetachController(ctx ControllerContext) (bool, error) {
attachdetach.DefaultTimerConfig, attachdetach.DefaultTimerConfig,
) )
if attachDetachControllerErr != nil { if attachDetachControllerErr != nil {
return true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
} }
go attachDetachController.Run(ctx.Stop) go attachDetachController.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startVolumeExpandController(ctx ControllerContext) (bool, error) { func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
expandController, expandControllerErr := expand.NewExpandController( expandController, expandControllerErr := expand.NewExpandController(
ctx.ClientBuilder.ClientOrDie("expand-controller"), ctx.ClientBuilder.ClientOrDie("expand-controller"),
@ -221,44 +223,44 @@ func startVolumeExpandController(ctx ControllerContext) (bool, error) {
ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)) ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration))
if expandControllerErr != nil { if expandControllerErr != nil {
return true, fmt.Errorf("Failed to start volume expand controller : %v", expandControllerErr) return nil, true, fmt.Errorf("Failed to start volume expand controller : %v", expandControllerErr)
} }
go expandController.Run(ctx.Stop) go expandController.Run(ctx.Stop)
return true, nil return nil, true, nil
} }
return false, nil return nil, false, nil
} }
func startEndpointController(ctx ControllerContext) (bool, error) { func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
go endpointcontroller.NewEndpointController( go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(), ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(), ctx.InformerFactory.Core().V1().Endpoints(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"), ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.ComponentConfig.EndPointController.ConcurrentEndpointSyncs), ctx.Stop) ).Run(int(ctx.ComponentConfig.EndPointController.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startReplicationController(ctx ControllerContext) (bool, error) { func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
go replicationcontroller.NewReplicationManager( go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().ReplicationControllers(), ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.ClientBuilder.ClientOrDie("replication-controller"), ctx.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas, replicationcontroller.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop) ).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startPodGCController(ctx ControllerContext) (bool, error) { func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
go podgc.NewPodGC( go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold), int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Stop) ).Run(ctx.Stop)
return true, nil return nil, true, nil
} }
func startResourceQuotaController(ctx ControllerContext) (bool, error) { func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource) listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource)
@ -277,23 +279,23 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
} }
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return true, err return nil, true, err
} }
} }
resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions) resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions)
if err != nil { if err != nil {
return false, err return nil, false, err
} }
go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop) go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop)
// Periodically the quota controller to detect new resource types // Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop) go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop)
return true, nil return nil, true, nil
} }
func startNamespaceController(ctx ControllerContext) (bool, error) { func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error) {
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default. // including events), takes ~10 seconds by default.
@ -304,7 +306,7 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
dynamicClient, err := dynamic.NewForConfig(nsKubeconfig) dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
if err != nil { if err != nil {
return true, err return nil, true, err
} }
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
@ -319,10 +321,10 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
) )
go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop) go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil return nil, true, nil
} }
func startServiceAccountController(ctx ControllerContext) (bool, error) { func startServiceAccountController(ctx ControllerContext) (http.Handler, bool, error) {
sac, err := serviceaccountcontroller.NewServiceAccountsController( sac, err := serviceaccountcontroller.NewServiceAccountsController(
ctx.InformerFactory.Core().V1().ServiceAccounts(), ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Namespaces(), ctx.InformerFactory.Core().V1().Namespaces(),
@ -330,23 +332,23 @@ func startServiceAccountController(ctx ControllerContext) (bool, error) {
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
) )
if err != nil { if err != nil {
return true, fmt.Errorf("error creating ServiceAccount controller: %v", err) return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
} }
go sac.Run(1, ctx.Stop) go sac.Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }
func startTTLController(ctx ControllerContext) (bool, error) { func startTTLController(ctx ControllerContext) (http.Handler, bool, error) {
go ttlcontroller.NewTTLController( go ttlcontroller.NewTTLController(
ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"), ctx.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Stop) ).Run(5, ctx.Stop)
return true, nil return nil, true, nil
} }
func startGarbageCollectorController(ctx ControllerContext) (bool, error) { func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return false, nil return nil, false, nil
} }
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
@ -355,7 +357,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
dynamicClient, err := dynamic.NewForConfig(config) dynamicClient, err := dynamic.NewForConfig(config)
if err != nil { if err != nil {
return true, err return nil, true, err
} }
// Get an initial set of deletable resources to prime the garbage collector. // Get an initial set of deletable resources to prime the garbage collector.
@ -373,7 +375,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
ctx.InformersStarted, ctx.InformersStarted,
) )
if err != nil { if err != nil {
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err) return nil, true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
} }
// Start the garbage collector. // Start the garbage collector.
@ -384,24 +386,24 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
// the garbage collector. // the garbage collector.
go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
return true, nil return garbagecollector.NewDebugHandler(garbageCollector), true, nil
} }
func startPVCProtectionController(ctx ControllerContext) (bool, error) { func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
go pvcprotection.NewPVCProtectionController( go pvcprotection.NewPVCProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"), ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, ctx.Stop) ).Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }
func startPVProtectionController(ctx ControllerContext) (bool, error) { func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, error) {
go pvprotection.NewPVProtectionController( go pvprotection.NewPVProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumes(), ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.ClientBuilder.ClientOrDie("pv-protection-controller"), ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, ctx.Stop) ).Run(1, ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -24,10 +24,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/controller/disruption"
"net/http"
"github.com/golang/glog" "github.com/golang/glog"
) )
func startDisruptionController(ctx ControllerContext) (bool, error) { func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error) {
var group = "policy" var group = "policy"
var version = "v1beta1" var version = "v1beta1"
var resource = "poddisruptionbudgets" var resource = "poddisruptionbudgets"
@ -36,7 +38,7 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
glog.Infof( glog.Infof(
"Refusing to start disruption because resource %q in group %q is not available.", "Refusing to start disruption because resource %q in group %q is not available.",
resource, group+"/"+version) resource, group+"/"+version)
return false, nil return nil, false, nil
} }
go disruption.NewDisruptionController( go disruption.NewDisruptionController(
ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Pods(),
@ -47,5 +49,5 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
ctx.InformerFactory.Apps().V1beta1().StatefulSets(), ctx.InformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"), ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop) ).Run(ctx.Stop)
return true, nil return nil, true, nil
} }

View File

@ -17,17 +17,19 @@ limitations under the License.
package app package app
import ( import (
"net/http"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation" "k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
) )
func startClusterRoleAggregrationController(ctx ControllerContext) (bool, error) { func startClusterRoleAggregrationController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
return false, nil return nil, false, nil
} }
go clusterroleaggregation.NewClusterRoleAggregation( go clusterroleaggregation.NewClusterRoleAggregation(
ctx.InformerFactory.Rbac().V1().ClusterRoles(), ctx.InformerFactory.Rbac().V1().ClusterRoles(),
ctx.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), ctx.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, ctx.Stop) ).Run(5, ctx.Stop)
return true, nil return nil, true, nil
} }