diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index 9b8a64e765d..6971890ed53 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -22,8 +22,6 @@ package app import ( "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/discovery" - discocache "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/controller/podautoscaler" @@ -73,15 +71,10 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") - // TODO: we need something like deferred discovery REST mapper that calls invalidate - // on cache misses. - cachedDiscovery := discocache.NewMemCacheClient(hpaClientGoClient.Discovery()) - restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscovery) - restMapper.Reset() // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching, // so we want to re-fetch every time when we actually ask for it scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClientGoClient.Discovery()) - scaleClient, err := scale.NewForConfig(hpaClientConfig, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return false, err } @@ -95,7 +88,7 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me hpaClientGoClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), - restMapper, + ctx.RESTMapper, replicaCalc, ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 81ca3c89c65..3b4c645dca2 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -35,6 +35,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" @@ -223,6 +225,11 @@ type ControllerContext struct { // ComponentConfig provides access to init options for a given controller ComponentConfig componentconfig.KubeControllerManagerConfiguration + // DeferredDiscoveryRESTMapper is a RESTMapper that will defer + // initialization of the RESTMapper until the first mapping is + // requested. + RESTMapper *discovery.DeferredDiscoveryRESTMapper + // AvailableResources is a map listing currently available resources AvailableResources map[schema.GroupVersionResource]bool @@ -389,6 +396,14 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err) } + // Use a discovery client capable of being refreshed. + discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery") + cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery()) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedClient) + go wait.Until(func() { + restMapper.Reset() + }, 30*time.Second, stop) + availableResources, err := GetAvailableResources(rootClientBuilder) if err != nil { return ControllerContext{}, err @@ -404,6 +419,7 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien ClientBuilder: clientBuilder, InformerFactory: sharedInformers, ComponentConfig: s.Generic.ComponentConfig, + RESTMapper: restMapper, AvailableResources: availableResources, Cloud: cloud, LoopMode: loopMode, diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 00ea3c7374d..393b218a453 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -31,7 +31,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/discovery" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" @@ -300,13 +299,13 @@ func startNamespaceController(ctx ControllerContext) (bool, error) { nsKubeconfig.Burst *= 100 namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) - discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources - dynamicClient, err := dynamic.NewForConfig(nsKubeconfig) if err != nil { return true, err } + discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + namespaceController := namespacecontroller.NewNamespaceController( namespaceKubeClient, dynamicClient, @@ -348,11 +347,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { } gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") - - // Use a discovery client capable of being refreshed. discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery()) - restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient) - restMapper.Reset() config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") config.ContentConfig = dynamic.ContentConfig() @@ -360,8 +355,8 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { // resource types. Otherwise we'll be storing full Unstructured data in our // caches for custom resources. Consider porting it to work with // metav1beta1.PartialObjectMetadata. - metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) - clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) + metaOnlyClientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc) + clientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc) // Get an initial set of deletable resources to prime the garbage collector. deletableResources := garbagecollector.GetDeletableResources(discoveryClient) @@ -372,7 +367,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { garbageCollector, err := garbagecollector.NewGarbageCollector( metaOnlyClientPool, clientPool, - restMapper, + ctx.RESTMapper, deletableResources, ignoredResources, ctx.InformerFactory, diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 8e19f2404ee..2e20d637513 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -254,6 +254,12 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work } syncPeriod := 5 * time.Second startGC := func(workers int) { + go wait.Until(func() { + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + restMapper.Reset() + }, syncPeriod, stopCh) go gc.Run(workers, stopCh) go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh) }