Merge pull request #45427 from ncdc/gc-shared-informers

Automatic merge from submit-queue (batch tested with PRs 46201, 45952, 45427, 46247, 46062)

Use shared informers in gc controller if possible

Modify the garbage collector controller to try to use shared informers for resources, if possible, to reduce the number of unique reflectors listing and watching the same thing.

cc @kubernetes/sig-api-machinery-pr-reviews @caesarxuchao @deads2k @liggitt @sttts @smarterclayton @timothysc @soltysh @kargakis @kubernetes/rh-cluster-infra @derekwaynecarr @wojtek-t @gmarek
This commit is contained in:
Kubernetes Submit Queue 2017-05-22 20:58:03 -07:00 committed by GitHub
commit cc6e51c6e8
9 changed files with 175 additions and 180 deletions

View File

@ -189,7 +189,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig = dynamic.ContentConfig()
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources)
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources, ctx.InformerFactory)
if err != nil {
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
}

View File

@ -23,6 +23,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/garbagecollector/metaonly:go_default_library",
@ -60,6 +61,8 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller/garbagecollector/metaonly:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/util/workqueue"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
// install the prometheus plugin
@ -69,9 +70,16 @@ type GarbageCollector struct {
registeredRateLimiter *RegisteredRateLimiter
// GC caches the owners that do not exist according to the API server.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
}
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
func NewGarbageCollector(
metaOnlyClientPool dynamic.ClientPool,
clientPool dynamic.ClientPool,
mapper meta.RESTMapper,
deletableResources map[schema.GroupVersionResource]struct{},
sharedInformers informers.SharedInformerFactory,
) (*GarbageCollector, error) {
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
absentOwnerCache := NewUIDCache(500)
@ -94,6 +102,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
sharedInformers: sharedInformers,
}
if err := gb.monitorsForResources(deletableResources); err != nil {
return nil, err

View File

@ -41,6 +41,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
@ -55,7 +57,10 @@ func TestNewGarbageCollector(t *testing.T) {
// no monitor will be constructed for non-core resource, the GC construction will not fail.
{Group: "tpr.io", Version: "v1", Resource: "unknown"}: {},
}
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource)
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers)
if err != nil {
t.Fatal(err)
}
@ -113,17 +118,26 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request))
return srv, config
}
func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
type garbageCollector struct {
*GarbageCollector
stop chan struct{}
}
func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource)
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers)
if err != nil {
t.Fatal(err)
}
return gc
stop := make(chan struct{})
go sharedInformers.Start(stop)
return garbageCollector{gc, stop}
}
func getPod(podName string, ownerReferences []metav1.OwnerReference) *v1.Pod {
@ -172,7 +186,10 @@ func TestAttemptToDeleteItem(t *testing.T) {
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
gc := setupGC(t, clientConfig)
defer close(gc.stop)
item := &node{
identity: objectReference{
OwnerReference: metav1.OwnerReference{
@ -320,6 +337,7 @@ func TestProcessEvent(t *testing.T) {
// data race among in the dependents field.
func TestDependentsRace(t *testing.T) {
gc := setupGC(t, &restclient.Config{})
defer close(gc.stop)
const updates = 100
owner := &node{dependents: make(map[*node]struct{})}
@ -453,6 +471,7 @@ func TestAbsentUIDCache(t *testing.T) {
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
gc := setupGC(t, clientConfig)
defer close(gc.stop)
gc.absentOwnerCache = NewUIDCache(2)
gc.attemptToDeleteItem(podToGCNode(rc1Pod1))
gc.attemptToDeleteItem(podToGCNode(rc2Pod1))

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
)
type eventType int
@ -62,6 +63,7 @@ type event struct {
obj interface{}
// the update event comes with an old object, but it's not used by the garbage collector.
oldObj interface{}
gvk schema.GroupVersionKind
}
// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
@ -90,6 +92,8 @@ type GraphBuilder struct {
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
stopCh <-chan struct{}
}
func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch {
@ -118,6 +122,58 @@ func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
event := &event{
eventType: addEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{
eventType: updateEvent,
obj: newObj,
oldObj: oldObj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
event := &event{
eventType: deleteEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err == nil {
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
if gb.stopCh != nil {
// if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this
// means we've re-loaded and re-read discovery and we are adding a new monitor for a
// previously unseen resource, so we need to call Start on the shared informers again (this
// will only start those shared informers that have not yet been started).
go gb.sharedInformers.Start(gb.stopCh)
}
return shared.Informer().GetController(), nil
} else {
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
}
// TODO: consider store in one storage.
glog.V(5).Infof("create storage for resource %s", resource)
client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind)
@ -125,47 +181,12 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
return nil, err
}
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
}
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
}
_, monitor := cache.NewInformer(
listWatcher(client, resource),
nil,
ResourceResyncTime,
cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := &event{
eventType: addEvent,
obj: obj,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{updateEvent, newObj, oldObj}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := &event{
eventType: deleteEvent,
obj: obj,
}
gb.graphChanges.Add(event)
},
},
// don't need to clone because it's not from shared cache
handlers,
)
return monitor, nil
}
@ -205,6 +226,9 @@ func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
go monitor.Run(stopCh)
}
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// set this so that we can use it if we need to start new shared informers
gb.stopCh = stopCh
}
var ignoredResources = map[schema.GroupVersionResource]struct{}{
@ -435,12 +459,7 @@ func (gb *GraphBuilder) processGraphChanges() bool {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
typeAccessor, err := meta.TypeAccessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, event type %v", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
// Check if the node already exsits
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
switch {
@ -448,8 +467,8 @@ func (gb *GraphBuilder) processGraphChanges() bool {
newNode := &node{
identity: objectReference{
OwnerReference: metav1.OwnerReference{
APIVersion: typeAccessor.GetAPIVersion(),
Kind: typeAccessor.GetKind(),
APIVersion: event.gvk.GroupVersion().String(),
Kind: event.gvk.Kind,
UID: accessor.GetUID(),
Name: accessor.GetName(),
},

View File

@ -304,33 +304,8 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "secrets", "serviceaccounts").RuleOrDie(),
// Needed to check API access. These creates are non-mutating
rbac.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources(
"configmaps",
"namespaces",
"nodes",
"persistentvolumeclaims",
"persistentvolumes",
"pods",
"replicationcontrollers",
"resourcequotas",
"secrets",
"services",
"serviceaccounts",
).RuleOrDie(),
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources(
"daemonsets",
"deployments",
"podsecuritypolicies",
"replicasets",
).RuleOrDie(),
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("deployments").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
// Needed for all shared informers
rbac.NewRule("list", "watch").Groups("*").Resources("*").RuleOrDie(),
},
},
{

View File

@ -479,79 +479,9 @@ items:
verbs:
- create
- apiGroups:
- ""
- '*'
resources:
- configmaps
- namespaces
- nodes
- persistentvolumeclaims
- persistentvolumes
- pods
- replicationcontrollers
- resourcequotas
- secrets
- serviceaccounts
- services
verbs:
- list
- watch
- apiGroups:
- extensions
resources:
- daemonsets
- deployments
- podsecuritypolicies
- replicasets
verbs:
- list
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- list
- watch
- apiGroups:
- batch
resources:
- cronjobs
- jobs
verbs:
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- list
- watch
- apiGroups:
- autoscaling
resources:
- horizontalpodautoscalers
verbs:
- list
- watch
- apiGroups:
- certificates.k8s.io
resources:
- certificatesigningrequests
verbs:
- list
- watch
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
- '*'
verbs:
- list
- watch

View File

@ -18,6 +18,7 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller/garbagecollector:go_default_library",
"//pkg/controller/garbagecollector/metaonly:go_default_library",
"//test/integration:go_default_library",

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/test/integration"
@ -123,7 +124,7 @@ func newOwnerRC(name, namespace string) *v1.ReplicationController {
}
}
func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) {
func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s, closeFn := framework.RunAMaster(masterConfig)
@ -146,19 +147,30 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollect
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources)
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources, sharedInformers)
if err != nil {
t.Fatalf("Failed to create garbage collector")
}
go sharedInformers.Start(stop)
return s, closeFn, gc, clientSet
}
// This test simulates the cascading deletion.
func TestCascadingDeletion(t *testing.T) {
stopCh := make(chan struct{})
glog.V(6).Infof("TestCascadingDeletion starts")
defer glog.V(6).Infof("TestCascadingDeletion ends")
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-cascading-deletion", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -215,9 +227,7 @@ func TestCascadingDeletion(t *testing.T) {
if len(pods.Items) != 3 {
t.Fatalf("Expect only 3 pods")
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
// delete one of the replication controller
if err := rcClient.Delete(toBeDeletedRCName, getNonOrphanOptions()); err != nil {
t.Fatalf("failed to delete replication controller: %v", err)
@ -245,8 +255,14 @@ func TestCascadingDeletion(t *testing.T) {
// This test simulates the case where an object is created with an owner that
// doesn't exist. It verifies the GC will delete such an object.
func TestCreateWithNonExistentOwner(t *testing.T) {
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-non-existing-owner", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -267,9 +283,7 @@ func TestCreateWithNonExistentOwner(t *testing.T) {
if len(pods.Items) != 1 {
t.Fatalf("Expect only 1 pod")
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
// wait for the garbage collector to delete the pod
if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 5*time.Second, 30*time.Second); err != nil {
t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err)
@ -341,15 +355,20 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
// e2e tests that put more stress.
func TestStressingCascadingDeletion(t *testing.T) {
t.Logf("starts garbage collector stress test")
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-stressing-cascading-deletion", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
const collections = 10
var wg sync.WaitGroup
@ -402,8 +421,15 @@ func TestStressingCascadingDeletion(t *testing.T) {
}
func TestOrphaning(t *testing.T) {
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-orphaning", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -429,9 +455,7 @@ func TestOrphaning(t *testing.T) {
}
podUIDs = append(podUIDs, pod.ObjectMeta.UID)
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
// we need wait for the gc to observe the creation of the pods, otherwise if
// the deletion of RC is observed before the creation of the pods, the pods
@ -473,8 +497,15 @@ func TestOrphaning(t *testing.T) {
}
func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-foreground1", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -500,9 +531,7 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
if err != nil {
@ -535,8 +564,15 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
}
func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-foreground2", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -570,9 +606,7 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
if err != nil {
@ -603,8 +637,15 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
}
func TestBlockingOwnerRefDoesBlock(t *testing.T) {
s, closeFn, gc, clientSet := setup(t)
defer closeFn()
stopCh := make(chan struct{})
s, closeFn, gc, clientSet := setup(t, stopCh)
defer func() {
// We have to close the stop channel first, so the shared informers can terminate their watches;
// otherwise closeFn() will hang waiting for active client connections to finish.
close(stopCh)
closeFn()
}()
ns := framework.CreateTestingNamespace("gc-foreground3", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -627,9 +668,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go gc.Run(5, stopCh)
defer close(stopCh)
// this makes sure the garbage collector will have added the pod to its
// dependency graph before handling the foreground deletion of the rc.