resourcequota: use dynamic informer

The resource quota controller should use a dynamic informer so it
can create informer for custom resources.
This commit is contained in:
zhouhaibing089 2018-12-27 15:37:15 -08:00 committed by haibzhou
parent a8cbb22506
commit f58c2ae62d
10 changed files with 202 additions and 27 deletions

View File

@ -122,6 +122,7 @@ go_library(
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -43,6 +43,8 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/term"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -234,6 +236,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
}
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
select {}
@ -288,6 +291,10 @@ type ControllerContext struct {
// InformerFactory gives access to informers for the controller.
InformerFactory informers.SharedInformerFactory
// GenericInformerFactory gives access to informers for typed resources
// and dynamic resources.
GenericInformerFactory controller.InformerFactory
// ComponentConfig provides access to init options for a given controller
ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration
@ -433,6 +440,9 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
dynamicClient := dynamic.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("dynamic-informers"))
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, ResyncPeriod(s)())
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
@ -459,16 +469,17 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
}
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
GenericInformerFactory: controller.NewInformerFactory(sharedInformers, dynamicInformers),
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
return ctx, nil
}

View File

@ -23,13 +23,12 @@ package app
import (
"fmt"
"net"
"net/http"
"strings"
"time"
"k8s.io/klog"
"net/http"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -295,7 +294,7 @@ func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, er
QuotaClient: resourceQuotaControllerClient.CoreV1(),
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
InformerFactory: ctx.InformerFactory,
InformerFactory: ctx.GenericInformerFactory,
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
DiscoveryFunc: discoveryFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,

View File

@ -121,10 +121,12 @@ func TestController_DiscoveryError(t *testing.T) {
testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources}
testClientset := NewFakeClientset(testDiscovery)
testClientBuilder := TestClientBuilder{clientset: testClientset}
testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1))
ctx := ControllerContext{
ClientBuilder: testClientBuilder,
InformerFactory: informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1)),
InformersStarted: make(chan struct{}),
ClientBuilder: testClientBuilder,
InformerFactory: testInformerFactory,
GenericInformerFactory: testInformerFactory,
InformersStarted: make(chan struct{}),
}
for funcName, controllerInit := range controllerInitFuncMap {
_, _, err := controllerInit(ctx)

View File

@ -47,6 +47,7 @@ go_library(
"controller_ref_manager.go",
"controller_utils.go",
"doc.go",
"informer_factory.go",
"lookup_cache.go",
],
importpath = "k8s.io/kubernetes/pkg/controller",
@ -79,6 +80,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",

View File

@ -0,0 +1,56 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
)
// InformerFactory creates informers for each group version resource.
type InformerFactory interface {
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
Start(stopCh <-chan struct{})
}
type informerFactory struct {
typedInformerFactory informers.SharedInformerFactory
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
}
func (i *informerFactory) ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) {
informer, err := i.typedInformerFactory.ForResource(resource)
if err != nil {
return i.dynamicInformerFactory.ForResource(resource), nil
}
return informer, nil
}
func (i *informerFactory) Start(stopCh <-chan struct{}) {
i.typedInformerFactory.Start(stopCh)
i.dynamicInformerFactory.Start(stopCh)
}
// NewInformerFactory creates a new InformerFactory which works with both typed
// resources and dynamic resources
func NewInformerFactory(typedInformerFactory informers.SharedInformerFactory, dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory) InformerFactory {
return &informerFactory{
typedInformerFactory: typedInformerFactory,
dynamicInformerFactory: dynamicInformerFactory,
}
}

View File

@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
@ -52,12 +51,6 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
// that may require quota to be recalculated.
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
// InformerFactory is all the quota system needs to interface with informers.
type InformerFactory interface {
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
Start(stopCh <-chan struct{})
}
// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
@ -75,7 +68,7 @@ type ResourceQuotaControllerOptions struct {
// InformersStarted knows if informers were started.
InformersStarted <-chan struct{}
// InformerFactory interfaces with informers.
InformerFactory InformerFactory
InformerFactory controller.InformerFactory
// Controls full resync of objects monitored for replenishment.
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
}

View File

@ -86,7 +86,7 @@ type QuotaMonitor struct {
resourceChanges workqueue.RateLimitingInterface
// interfaces with informers
informerFactory InformerFactory
informerFactory controller.InformerFactory
// list of resources to ignore
ignoredResources map[schema.GroupResource]struct{}
@ -101,7 +101,7 @@ type QuotaMonitor struct {
registry quota.Registry
}
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory controller.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
return &QuotaMonitor{
informersStarted: informersStarted,
informerFactory: informerFactory,

View File

@ -27,11 +27,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/crd"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
@ -487,6 +489,89 @@ var _ = SIGDescribe("ResourceQuota", func() {
Expect(err).NotTo(HaveOccurred())
})
It("should create a ResourceQuota and capture the life of a custom resource.", func() {
By("Creating a Custom Resource Definition")
testcrd, err := crd.CreateTestCRD(f)
Expect(err).NotTo(HaveOccurred())
defer testcrd.CleanUp()
countResourceName := "count/" + testcrd.Crd.Spec.Names.Plural + "." + testcrd.Crd.Spec.Group
// resourcequota controller needs to take 30 seconds at most to detect the new custom resource.
// in order to make sure the resourcequota controller knows this resource, we create one test
// resourcequota object, and triggering updates on it until the status is updated.
quotaName := "quota-for-" + testcrd.Crd.Spec.Names.Plural
resourceQuota, err := createResourceQuota(f.ClientSet, f.Namespace.Name, &v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: quotaName},
Spec: v1.ResourceQuotaSpec{
Hard: v1.ResourceList{
v1.ResourceName(countResourceName): resource.MustParse("0"),
},
},
})
err = updateResourceQuotaUntilUsageAppears(f.ClientSet, f.Namespace.Name, quotaName, v1.ResourceName(countResourceName))
Expect(err).NotTo(HaveOccurred())
err = f.ClientSet.CoreV1().ResourceQuotas(f.Namespace.Name).Delete(quotaName, nil)
Expect(err).NotTo(HaveOccurred())
By("Counting existing ResourceQuota")
c, err := countResourceQuota(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Creating a ResourceQuota")
quotaName = "test-quota"
resourceQuota = newTestResourceQuota(quotaName)
resourceQuota.Spec.Hard[v1.ResourceName(countResourceName)] = resource.MustParse("1")
resourceQuota, err = createResourceQuota(f.ClientSet, f.Namespace.Name, resourceQuota)
Expect(err).NotTo(HaveOccurred())
By("Ensuring resource quota status is calculated")
usedResources := v1.ResourceList{}
usedResources[v1.ResourceQuotas] = resource.MustParse(strconv.Itoa(c + 1))
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())
By("Creating a custom resource")
resourceClient := testcrd.GetV1DynamicClient()
testcr, err := instantiateCustomResource(&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
"kind": testcrd.Crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": "test-cr-1",
},
},
}, resourceClient, testcrd.Crd)
Expect(err).NotTo(HaveOccurred())
By("Ensuring resource quota status captures custom resource creation")
usedResources = v1.ResourceList{}
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("1")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())
By("Creating a second custom resource")
_, err = instantiateCustomResource(&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": testcrd.APIGroup + "/" + testcrd.GetAPIVersions()[0],
"kind": testcrd.Crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": "test-cr-2",
},
},
}, resourceClient, testcrd.Crd)
// since we only give one quota, this creation should fail.
Expect(err).To(HaveOccurred())
By("Deleting a custom resource")
err = deleteCustomResource(resourceClient, testcr.GetName())
Expect(err).NotTo(HaveOccurred())
By("Ensuring resource quota status released usage")
usedResources[v1.ResourceName(countResourceName)] = resource.MustParse("0")
err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources)
Expect(err).NotTo(HaveOccurred())
})
It("should verify ResourceQuota with terminating scopes.", func() {
By("Creating a ResourceQuota with terminating scope")
quotaTerminatingName := "quota-terminating"
@ -1524,3 +1609,29 @@ func waitForResourceQuota(c clientset.Interface, ns, quotaName string, used v1.R
return true, nil
})
}
// updateResourceQuotaUntilUsageAppears updates the resource quota object until the usage is populated
// for the specific resource name.
func updateResourceQuotaUntilUsageAppears(c clientset.Interface, ns, quotaName string, resourceName v1.ResourceName) error {
return wait.Poll(framework.Poll, 1*time.Minute, func() (bool, error) {
resourceQuota, err := c.CoreV1().ResourceQuotas(ns).Get(quotaName, metav1.GetOptions{})
if err != nil {
return false, err
}
// verify that the quota shows the expected used resource values
_, ok := resourceQuota.Status.Used[resourceName]
if ok {
return true, nil
}
current := resourceQuota.Spec.Hard[resourceName]
current.Add(resource.MustParse("1"))
resourceQuota.Spec.Hard[resourceName] = current
_, err = c.CoreV1().ResourceQuotas(ns).Update(resourceQuota)
// ignoring conflicts since someone else may already updated it.
if errors.IsConflict(err) {
return false, nil
}
return false, err
})
}