diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 99a4ac166ac..85cc74a826a 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -400,7 +400,7 @@ func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, ctx.RESTMapper, deletableResources, ignoredResources, - ctx.InformerFactory, + ctx.GenericInformerFactory, ctx.InformersStarted, ) if err != nil { diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 4586aa09186..eab615c16f4 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -26,7 +26,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/controller/garbagecollector/BUILD b/pkg/controller/garbagecollector/BUILD index 9f24470da5e..cb6b3c41141 100644 --- a/pkg/controller/garbagecollector/BUILD +++ b/pkg/controller/garbagecollector/BUILD @@ -33,10 +33,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", @@ -60,6 +58,7 @@ go_test( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core/install:go_default_library", + "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library", @@ -71,6 +70,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index b62e94f6811..d233b45ec49 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -36,7 +36,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" // import known versions @@ -66,7 +65,7 @@ type GarbageCollector struct { dependencyGraphBuilder *GraphBuilder // GC caches the owners that do not exist according to the API server. absentOwnerCache *UIDCache - sharedInformers informers.SharedInformerFactory + sharedInformers controller.InformerFactory workerLock sync.RWMutex } @@ -76,7 +75,7 @@ func NewGarbageCollector( mapper resettableRESTMapper, deletableResources map[schema.GroupVersionResource]struct{}, ignoredResources map[schema.GroupResource]struct{}, - sharedInformers informers.SharedInformerFactory, + sharedInformers controller.InformerFactory, informersStarted <-chan struct{}, ) (*GarbageCollector, error) { attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") @@ -90,7 +89,6 @@ func NewGarbageCollector( absentOwnerCache: absentOwnerCache, } gb := &GraphBuilder{ - dynamicClient: dynamicClient, informersStarted: informersStarted, restMapper: mapper, graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 4076ef01432..47677c3a70c 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -41,12 +41,14 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/controller" ) type testRESTMapper struct { @@ -72,13 +74,15 @@ func TestGarbageCollectorConstruction(t *testing.T) { {Group: "tpr.io", Version: "v1", Resource: "unknown"}: {}, } client := fake.NewSimpleClientset() - sharedInformers := informers.NewSharedInformerFactory(client, 0) + sharedInformers := informers.NewSharedInformerFactory(client, 0) + dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) // No monitor will be constructed for the non-core resource, but the GC // construction will not fail. alwaysStarted := make(chan struct{}) close(alwaysStarted) - gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) + gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{}, + controller.NewInformerFactory(sharedInformers, dynamicInformers), alwaysStarted) if err != nil { t.Fatal(err) } @@ -429,36 +433,6 @@ func TestDependentsRace(t *testing.T) { }() } -// test the list and watch functions correctly converts the ListOptions -func TestGCListWatcher(t *testing.T) { - testHandler := &fakeActionHandler{} - srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) - defer srv.Close() - podResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} - dynamicClient, err := dynamic.NewForConfig(clientConfig) - if err != nil { - t.Fatal(err) - } - - lw := listWatcher(dynamicClient, podResource) - lw.DisableChunking = true - if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil { - t.Fatal(err) - } - if _, err := lw.List(metav1.ListOptions{ResourceVersion: "1"}); err != nil { - t.Fatal(err) - } - if e, a := 2, len(testHandler.actions); e != a { - t.Errorf("expect %d requests, got %d", e, a) - } - if e, a := "resourceVersion=1&watch=true", testHandler.actions[0].query; e != a { - t.Errorf("expect %s, got %s", e, a) - } - if e, a := "resourceVersion=1", testHandler.actions[1].query; e != a { - t.Errorf("expect %s, got %s", e, a) - } -} - func podToGCNode(pod *v1.Pod) *node { return &node{ identity: objectReference{ diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index ab9e45470e9..8d39767ccd6 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -26,17 +26,14 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) @@ -91,7 +88,6 @@ type GraphBuilder struct { // it is protected by monitorLock. running bool - dynamicClient dynamic.Interface // monitors are the producer of the graphChanges queue, graphBuilder alters // the in-memory graph according to the changes. graphChanges workqueue.RateLimitingInterface @@ -104,7 +100,7 @@ type GraphBuilder struct { // GraphBuilder and GC share the absentOwnerCache. Objects that are known to // be non-existent are added to the cached. absentOwnerCache *UIDCache - sharedInformers informers.SharedInformerFactory + sharedInformers controller.InformerFactory ignoredResources map[schema.GroupResource]struct{} } @@ -126,19 +122,6 @@ func (m *monitor) Run() { type monitors map[schema.GroupVersionResource]*monitor -func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch { - return &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - // We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok. - return client.Resource(resource).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok. - return client.Resource(resource).Watch(options) - }, - } -} - func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) { handlers := cache.ResourceEventHandlerFuncs{ // add the event to the dependencyGraphBuilder's graphChanges. @@ -175,24 +158,14 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind }, } shared, err := gb.sharedInformers.ForResource(resource) - if err == nil { - klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) - // need to clone because it's from a shared cache - shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) - return shared.Informer().GetController(), shared.Informer().GetStore(), nil + if err != nil { + klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) + return nil, nil, err } - klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) - - // TODO: consider store in one storage. - klog.V(5).Infof("create storage for resource %s", resource) - store, monitor := cache.NewInformer( - listWatcher(gb.dynamicClient, resource), - nil, - ResourceResyncTime, - // don't need to clone because it's not from shared cache - handlers, - ) - return monitor, store, nil + klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) + // need to clone because it's from a shared cache + shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) + return shared.Informer().GetController(), shared.Informer().GetStore(), nil } // syncMonitors rebuilds the monitor set according to the supplied resources, diff --git a/test/integration/garbagecollector/BUILD b/test/integration/garbagecollector/BUILD index 298825a736f..48b724790da 100644 --- a/test/integration/garbagecollector/BUILD +++ b/test/integration/garbagecollector/BUILD @@ -12,6 +12,7 @@ go_test( tags = ["integration"], deps = [ "//cmd/kube-apiserver/app/testing:go_default_library", + "//pkg/controller:go_default_library", "//pkg/controller/garbagecollector:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", @@ -28,6 +29,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/restmapper:go_default_library", diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index a7dcd70341d..29be203e916 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -38,11 +38,13 @@ import ( "k8s.io/apiserver/pkg/storage/names" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" @@ -229,6 +231,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work t.Fatalf("failed to create dynamicClient: %v", err) } sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) + dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) alwaysStarted := make(chan struct{}) close(alwaysStarted) gc, err := garbagecollector.NewGarbageCollector( @@ -236,7 +239,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work restMapper, deletableResources, garbagecollector.DefaultIgnoredResources(), - sharedInformers, + controller.NewInformerFactory(sharedInformers, dynamicInformers), alwaysStarted, ) if err != nil { @@ -966,9 +969,24 @@ func TestCRDDeletionCascading(t *testing.T) { ns := createNamespaceOrDie("crd-mixed", clientSet, t) - configMapClient := clientSet.CoreV1().ConfigMaps(ns.Name) - + t.Logf("First pass CRD cascading deletion") definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, dynamicClient, ns.Name) + testCRDDeletion(t, ctx, ns, definition, resourceClient) + + t.Logf("Second pass CRD cascading deletion") + accessor := meta.NewAccessor() + accessor.SetResourceVersion(definition, "") + _, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, dynamicClient) + if err != nil { + t.Fatalf("failed to create CustomResourceDefinition: %v", err) + } + testCRDDeletion(t, ctx, ns, definition, resourceClient) +} + +func testCRDDeletion(t *testing.T, ctx *testContext, ns *v1.Namespace, definition *apiextensionsv1beta1.CustomResourceDefinition, resourceClient dynamic.ResourceInterface) { + clientSet, apiExtensionClient := ctx.clientSet, ctx.apiExtensionClient + + configMapClient := clientSet.CoreV1().ConfigMaps(ns.Name) // Create a custom owner resource. owner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner")), metav1.CreateOptions{})