diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 439fa455a9d..678c4fa2bb6 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -189,7 +189,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig = dynamic.ContentConfig() clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) - garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources) + garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources, ctx.InformerFactory) if err != nil { return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err) } diff --git a/pkg/controller/garbagecollector/BUILD b/pkg/controller/garbagecollector/BUILD index 86fbb1b3e7e..85c6509119b 100644 --- a/pkg/controller/garbagecollector/BUILD +++ b/pkg/controller/garbagecollector/BUILD @@ -23,6 +23,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/client/retry:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/garbagecollector/metaonly:go_default_library", @@ -60,6 +61,8 @@ go_test( "//pkg/api:go_default_library", "//pkg/api/install:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller/garbagecollector/metaonly:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 6e80dba753b..bf3b1673af1 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/util/workqueue" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" // install the prometheus plugin @@ -69,9 +70,16 @@ type GarbageCollector struct { registeredRateLimiter *RegisteredRateLimiter // GC caches the owners that do not exist according to the API server. absentOwnerCache *UIDCache + sharedInformers informers.SharedInformerFactory } -func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) { +func NewGarbageCollector( + metaOnlyClientPool dynamic.ClientPool, + clientPool dynamic.ClientPool, + mapper meta.RESTMapper, + deletableResources map[schema.GroupVersionResource]struct{}, + sharedInformers informers.SharedInformerFactory, +) (*GarbageCollector, error) { attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") absentOwnerCache := NewUIDCache(500) @@ -94,6 +102,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam attemptToDelete: attemptToDelete, attemptToOrphan: attemptToOrphan, absentOwnerCache: absentOwnerCache, + sharedInformers: sharedInformers, } if err := gb.monitorsForResources(deletableResources); err != nil { return nil, err diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index ff169dabdce..2868f7b543c 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -41,6 +41,8 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) @@ -55,7 +57,10 @@ func TestNewGarbageCollector(t *testing.T) { // no monitor will be constructed for non-core resource, the GC construction will not fail. {Group: "tpr.io", Version: "v1", Resource: "unknown"}: {}, } - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource) + + client := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(client, 0) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers) if err != nil { t.Fatal(err) } @@ -113,17 +118,26 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) return srv, config } -func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector { +type garbageCollector struct { + *GarbageCollector + stop chan struct{} +} + +func setupGC(t *testing.T, config *restclient.Config) garbageCollector { config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource) + client := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(client, 0) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers) if err != nil { t.Fatal(err) } - return gc + stop := make(chan struct{}) + go sharedInformers.Start(stop) + return garbageCollector{gc, stop} } func getPod(podName string, ownerReferences []metav1.OwnerReference) *v1.Pod { @@ -172,7 +186,10 @@ func TestAttemptToDeleteItem(t *testing.T) { } srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) defer srv.Close() + gc := setupGC(t, clientConfig) + defer close(gc.stop) + item := &node{ identity: objectReference{ OwnerReference: metav1.OwnerReference{ @@ -320,6 +337,7 @@ func TestProcessEvent(t *testing.T) { // data race among in the dependents field. func TestDependentsRace(t *testing.T) { gc := setupGC(t, &restclient.Config{}) + defer close(gc.stop) const updates = 100 owner := &node{dependents: make(map[*node]struct{})} @@ -453,6 +471,7 @@ func TestAbsentUIDCache(t *testing.T) { srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) defer srv.Close() gc := setupGC(t, clientConfig) + defer close(gc.stop) gc.absentOwnerCache = NewUIDCache(2) gc.attemptToDeleteItem(podToGCNode(rc1Pod1)) gc.attemptToDeleteItem(podToGCNode(rc2Pod1)) diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index df0dcfa6387..c57518759e4 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" ) type eventType int @@ -62,6 +63,7 @@ type event struct { obj interface{} // the update event comes with an old object, but it's not used by the garbage collector. oldObj interface{} + gvk schema.GroupVersionKind } // GraphBuilder: based on the events supplied by the informers, GraphBuilder updates @@ -90,6 +92,8 @@ type GraphBuilder struct { // GraphBuilder and GC share the absentOwnerCache. Objects that are known to // be non-existent are added to the cached. absentOwnerCache *UIDCache + sharedInformers informers.SharedInformerFactory + stopCh <-chan struct{} } func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch { @@ -118,6 +122,58 @@ func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) * } func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) { + handlers := cache.ResourceEventHandlerFuncs{ + // add the event to the dependencyGraphBuilder's graphChanges. + AddFunc: func(obj interface{}) { + event := &event{ + eventType: addEvent, + obj: obj, + gvk: kind, + } + gb.graphChanges.Add(event) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: check if there are differences in the ownerRefs, + // finalizers, and DeletionTimestamp; if not, ignore the update. + event := &event{ + eventType: updateEvent, + obj: newObj, + oldObj: oldObj, + gvk: kind, + } + gb.graphChanges.Add(event) + }, + DeleteFunc: func(obj interface{}) { + // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it + if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + event := &event{ + eventType: deleteEvent, + obj: obj, + gvk: kind, + } + gb.graphChanges.Add(event) + }, + } + + shared, err := gb.sharedInformers.ForResource(resource) + if err == nil { + glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) + // need to clone because it's from a shared cache + shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) + if gb.stopCh != nil { + // if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this + // means we've re-loaded and re-read discovery and we are adding a new monitor for a + // previously unseen resource, so we need to call Start on the shared informers again (this + // will only start those shared informers that have not yet been started). + go gb.sharedInformers.Start(gb.stopCh) + } + return shared.Informer().GetController(), nil + } else { + glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) + } + // TODO: consider store in one storage. glog.V(5).Infof("create storage for resource %s", resource) client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind) @@ -125,47 +181,12 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind return nil, err } gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") - setObjectTypeMeta := func(obj interface{}) { - runtimeObject, ok := obj.(runtime.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj)) - } - runtimeObject.GetObjectKind().SetGroupVersionKind(kind) - } _, monitor := cache.NewInformer( listWatcher(client, resource), nil, ResourceResyncTime, - cache.ResourceEventHandlerFuncs{ - // add the event to the dependencyGraphBuilder's graphChanges. - AddFunc: func(obj interface{}) { - setObjectTypeMeta(obj) - event := &event{ - eventType: addEvent, - obj: obj, - } - gb.graphChanges.Add(event) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - setObjectTypeMeta(newObj) - // TODO: check if there are differences in the ownerRefs, - // finalizers, and DeletionTimestamp; if not, ignore the update. - event := &event{updateEvent, newObj, oldObj} - gb.graphChanges.Add(event) - }, - DeleteFunc: func(obj interface{}) { - // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it - if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = deletedFinalStateUnknown.Obj - } - setObjectTypeMeta(obj) - event := &event{ - eventType: deleteEvent, - obj: obj, - } - gb.graphChanges.Add(event) - }, - }, + // don't need to clone because it's not from shared cache + handlers, ) return monitor, nil } @@ -205,6 +226,9 @@ func (gb *GraphBuilder) Run(stopCh <-chan struct{}) { go monitor.Run(stopCh) } go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) + + // set this so that we can use it if we need to start new shared informers + gb.stopCh = stopCh } var ignoredResources = map[schema.GroupVersionResource]struct{}{ @@ -435,12 +459,7 @@ func (gb *GraphBuilder) processGraphChanges() bool { utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) return true } - typeAccessor, err := meta.TypeAccessor(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) - return true - } - glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, event type %v", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) + glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType) // Check if the node already exsits existingNode, found := gb.uidToNode.Read(accessor.GetUID()) switch { @@ -448,8 +467,8 @@ func (gb *GraphBuilder) processGraphChanges() bool { newNode := &node{ identity: objectReference{ OwnerReference: metav1.OwnerReference{ - APIVersion: typeAccessor.GetAPIVersion(), - Kind: typeAccessor.GetKind(), + APIVersion: event.gvk.GroupVersion().String(), + Kind: event.gvk.Kind, UID: accessor.GetUID(), Name: accessor.GetName(), }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 4343ec52430..65a568747c4 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -304,33 +304,8 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "secrets", "serviceaccounts").RuleOrDie(), // Needed to check API access. These creates are non-mutating rbac.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(), - - rbac.NewRule("list", "watch").Groups(legacyGroup).Resources( - "configmaps", - "namespaces", - "nodes", - "persistentvolumeclaims", - "persistentvolumes", - "pods", - "replicationcontrollers", - "resourcequotas", - "secrets", - "services", - "serviceaccounts", - ).RuleOrDie(), - rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources( - "daemonsets", - "deployments", - "podsecuritypolicies", - "replicasets", - ).RuleOrDie(), - rbac.NewRule("list", "watch").Groups(appsGroup).Resources("deployments").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(), - rbac.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(), + // Needed for all shared informers + rbac.NewRule("list", "watch").Groups("*").Resources("*").RuleOrDie(), }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 88feeb64ff3..9e785a7d0c5 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -479,79 +479,9 @@ items: verbs: - create - apiGroups: - - "" + - '*' resources: - - configmaps - - namespaces - - nodes - - persistentvolumeclaims - - persistentvolumes - - pods - - replicationcontrollers - - resourcequotas - - secrets - - serviceaccounts - - services - verbs: - - list - - watch - - apiGroups: - - extensions - resources: - - daemonsets - - deployments - - podsecuritypolicies - - replicasets - verbs: - - list - - watch - - apiGroups: - - apps - resources: - - deployments - verbs: - - list - - watch - - apiGroups: - - batch - resources: - - cronjobs - - jobs - verbs: - - list - - watch - - apiGroups: - - apps - resources: - - statefulsets - verbs: - - list - - watch - - apiGroups: - - policy - resources: - - poddisruptionbudgets - verbs: - - list - - watch - - apiGroups: - - autoscaling - resources: - - horizontalpodautoscalers - verbs: - - list - - watch - - apiGroups: - - certificates.k8s.io - resources: - - certificatesigningrequests - verbs: - - list - - watch - - apiGroups: - - storage.k8s.io - resources: - - storageclasses + - '*' verbs: - list - watch diff --git a/test/integration/garbagecollector/BUILD b/test/integration/garbagecollector/BUILD index ee4a92e6caf..b38363cc1d6 100644 --- a/test/integration/garbagecollector/BUILD +++ b/test/integration/garbagecollector/BUILD @@ -18,6 +18,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller/garbagecollector:go_default_library", "//pkg/controller/garbagecollector/metaonly:go_default_library", "//test/integration:go_default_library", diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index e56959ba6fc..4a01cc953f1 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/test/integration" @@ -123,7 +124,7 @@ func newOwnerRC(name, namespace string) *v1.ReplicationController { } } -func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) { +func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig.EnableCoreControllers = false _, s, closeFn := framework.RunAMaster(masterConfig) @@ -146,19 +147,30 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollect metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources) + sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) + gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources, sharedInformers) if err != nil { t.Fatalf("Failed to create garbage collector") } + + go sharedInformers.Start(stop) + return s, closeFn, gc, clientSet } // This test simulates the cascading deletion. func TestCascadingDeletion(t *testing.T) { + stopCh := make(chan struct{}) + glog.V(6).Infof("TestCascadingDeletion starts") defer glog.V(6).Infof("TestCascadingDeletion ends") - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + s, closeFn, gc, clientSet := setup(t, stopCh) + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-cascading-deletion", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -215,9 +227,7 @@ func TestCascadingDeletion(t *testing.T) { if len(pods.Items) != 3 { t.Fatalf("Expect only 3 pods") } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) // delete one of the replication controller if err := rcClient.Delete(toBeDeletedRCName, getNonOrphanOptions()); err != nil { t.Fatalf("failed to delete replication controller: %v", err) @@ -245,8 +255,14 @@ func TestCascadingDeletion(t *testing.T) { // This test simulates the case where an object is created with an owner that // doesn't exist. It verifies the GC will delete such an object. func TestCreateWithNonExistentOwner(t *testing.T) { - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-non-existing-owner", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -267,9 +283,7 @@ func TestCreateWithNonExistentOwner(t *testing.T) { if len(pods.Items) != 1 { t.Fatalf("Expect only 1 pod") } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) // wait for the garbage collector to delete the pod if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 5*time.Second, 30*time.Second); err != nil { t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err) @@ -341,15 +355,20 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa // e2e tests that put more stress. func TestStressingCascadingDeletion(t *testing.T) { t.Logf("starts garbage collector stress test") - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-stressing-cascading-deletion", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) const collections = 10 var wg sync.WaitGroup @@ -402,8 +421,15 @@ func TestStressingCascadingDeletion(t *testing.T) { } func TestOrphaning(t *testing.T) { - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-orphaning", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -429,9 +455,7 @@ func TestOrphaning(t *testing.T) { } podUIDs = append(podUIDs, pod.ObjectMeta.UID) } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) // we need wait for the gc to observe the creation of the pods, otherwise if // the deletion of RC is observed before the creation of the pods, the pods @@ -473,8 +497,15 @@ func TestOrphaning(t *testing.T) { } func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-foreground1", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -500,9 +531,7 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions()) if err != nil { @@ -535,8 +564,15 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { } func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-foreground2", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -570,9 +606,7 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions()) if err != nil { @@ -603,8 +637,15 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { } func TestBlockingOwnerRefDoesBlock(t *testing.T) { - s, closeFn, gc, clientSet := setup(t) - defer closeFn() + stopCh := make(chan struct{}) + s, closeFn, gc, clientSet := setup(t, stopCh) + + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() ns := framework.CreateTestingNamespace("gc-foreground3", s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -627,9 +668,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := make(chan struct{}) go gc.Run(5, stopCh) - defer close(stopCh) // this makes sure the garbage collector will have added the pod to its // dependency graph before handling the foreground deletion of the rc.