mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Use shared informers in gc controller if possible
This commit is contained in:
parent
16b5093feb
commit
2480f2ceb6
@ -189,7 +189,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig = dynamic.ContentConfig()
|
||||
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources)
|
||||
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources, ctx.InformerFactory)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ go_library(
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/client/retry:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/garbagecollector/metaonly:go_default_library",
|
||||
@ -60,6 +61,8 @@ go_test(
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/install:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/controller/garbagecollector/metaonly:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
// install the prometheus plugin
|
||||
@ -69,9 +70,16 @@ type GarbageCollector struct {
|
||||
registeredRateLimiter *RegisteredRateLimiter
|
||||
// GC caches the owners that do not exist according to the API server.
|
||||
absentOwnerCache *UIDCache
|
||||
sharedInformers informers.SharedInformerFactory
|
||||
}
|
||||
|
||||
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
|
||||
func NewGarbageCollector(
|
||||
metaOnlyClientPool dynamic.ClientPool,
|
||||
clientPool dynamic.ClientPool,
|
||||
mapper meta.RESTMapper,
|
||||
deletableResources map[schema.GroupVersionResource]struct{},
|
||||
sharedInformers informers.SharedInformerFactory,
|
||||
) (*GarbageCollector, error) {
|
||||
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
|
||||
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
|
||||
absentOwnerCache := NewUIDCache(500)
|
||||
@ -94,6 +102,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
|
||||
attemptToDelete: attemptToDelete,
|
||||
attemptToOrphan: attemptToOrphan,
|
||||
absentOwnerCache: absentOwnerCache,
|
||||
sharedInformers: sharedInformers,
|
||||
}
|
||||
if err := gb.monitorsForResources(deletableResources); err != nil {
|
||||
return nil, err
|
||||
|
@ -41,6 +41,8 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
)
|
||||
|
||||
@ -55,7 +57,10 @@ func TestNewGarbageCollector(t *testing.T) {
|
||||
// no monitor will be constructed for non-core resource, the GC construction will not fail.
|
||||
{Group: "tpr.io", Version: "v1", Resource: "unknown"}: {},
|
||||
}
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource)
|
||||
|
||||
client := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -113,17 +118,26 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request))
|
||||
return srv, config
|
||||
}
|
||||
|
||||
func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
|
||||
type garbageCollector struct {
|
||||
*GarbageCollector
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource)
|
||||
client := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, sharedInformers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return gc
|
||||
stop := make(chan struct{})
|
||||
go sharedInformers.Start(stop)
|
||||
return garbageCollector{gc, stop}
|
||||
}
|
||||
|
||||
func getPod(podName string, ownerReferences []metav1.OwnerReference) *v1.Pod {
|
||||
@ -172,7 +186,10 @@ func TestAttemptToDeleteItem(t *testing.T) {
|
||||
}
|
||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||
defer srv.Close()
|
||||
|
||||
gc := setupGC(t, clientConfig)
|
||||
defer close(gc.stop)
|
||||
|
||||
item := &node{
|
||||
identity: objectReference{
|
||||
OwnerReference: metav1.OwnerReference{
|
||||
@ -320,6 +337,7 @@ func TestProcessEvent(t *testing.T) {
|
||||
// data race among in the dependents field.
|
||||
func TestDependentsRace(t *testing.T) {
|
||||
gc := setupGC(t, &restclient.Config{})
|
||||
defer close(gc.stop)
|
||||
|
||||
const updates = 100
|
||||
owner := &node{dependents: make(map[*node]struct{})}
|
||||
@ -453,6 +471,7 @@ func TestAbsentUIDCache(t *testing.T) {
|
||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||
defer srv.Close()
|
||||
gc := setupGC(t, clientConfig)
|
||||
defer close(gc.stop)
|
||||
gc.absentOwnerCache = NewUIDCache(2)
|
||||
gc.attemptToDeleteItem(podToGCNode(rc1Pod1))
|
||||
gc.attemptToDeleteItem(podToGCNode(rc2Pod1))
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
)
|
||||
|
||||
type eventType int
|
||||
@ -62,6 +63,7 @@ type event struct {
|
||||
obj interface{}
|
||||
// the update event comes with an old object, but it's not used by the garbage collector.
|
||||
oldObj interface{}
|
||||
gvk schema.GroupVersionKind
|
||||
}
|
||||
|
||||
// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
|
||||
@ -90,6 +92,8 @@ type GraphBuilder struct {
|
||||
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
|
||||
// be non-existent are added to the cached.
|
||||
absentOwnerCache *UIDCache
|
||||
sharedInformers informers.SharedInformerFactory
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch {
|
||||
@ -118,6 +122,58 @@ func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *
|
||||
}
|
||||
|
||||
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) {
|
||||
handlers := cache.ResourceEventHandlerFuncs{
|
||||
// add the event to the dependencyGraphBuilder's graphChanges.
|
||||
AddFunc: func(obj interface{}) {
|
||||
event := &event{
|
||||
eventType: addEvent,
|
||||
obj: obj,
|
||||
gvk: kind,
|
||||
}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
// TODO: check if there are differences in the ownerRefs,
|
||||
// finalizers, and DeletionTimestamp; if not, ignore the update.
|
||||
event := &event{
|
||||
eventType: updateEvent,
|
||||
obj: newObj,
|
||||
oldObj: oldObj,
|
||||
gvk: kind,
|
||||
}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
|
||||
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = deletedFinalStateUnknown.Obj
|
||||
}
|
||||
event := &event{
|
||||
eventType: deleteEvent,
|
||||
obj: obj,
|
||||
gvk: kind,
|
||||
}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
}
|
||||
|
||||
shared, err := gb.sharedInformers.ForResource(resource)
|
||||
if err == nil {
|
||||
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
|
||||
// need to clone because it's from a shared cache
|
||||
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
|
||||
if gb.stopCh != nil {
|
||||
// if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this
|
||||
// means we've re-loaded and re-read discovery and we are adding a new monitor for a
|
||||
// previously unseen resource, so we need to call Start on the shared informers again (this
|
||||
// will only start those shared informers that have not yet been started).
|
||||
go gb.sharedInformers.Start(gb.stopCh)
|
||||
}
|
||||
return shared.Informer().GetController(), nil
|
||||
} else {
|
||||
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
|
||||
}
|
||||
|
||||
// TODO: consider store in one storage.
|
||||
glog.V(5).Infof("create storage for resource %s", resource)
|
||||
client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind)
|
||||
@ -125,47 +181,12 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
|
||||
return nil, err
|
||||
}
|
||||
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
|
||||
setObjectTypeMeta := func(obj interface{}) {
|
||||
runtimeObject, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
|
||||
}
|
||||
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
|
||||
}
|
||||
_, monitor := cache.NewInformer(
|
||||
listWatcher(client, resource),
|
||||
nil,
|
||||
ResourceResyncTime,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
// add the event to the dependencyGraphBuilder's graphChanges.
|
||||
AddFunc: func(obj interface{}) {
|
||||
setObjectTypeMeta(obj)
|
||||
event := &event{
|
||||
eventType: addEvent,
|
||||
obj: obj,
|
||||
}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
setObjectTypeMeta(newObj)
|
||||
// TODO: check if there are differences in the ownerRefs,
|
||||
// finalizers, and DeletionTimestamp; if not, ignore the update.
|
||||
event := &event{updateEvent, newObj, oldObj}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
|
||||
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = deletedFinalStateUnknown.Obj
|
||||
}
|
||||
setObjectTypeMeta(obj)
|
||||
event := &event{
|
||||
eventType: deleteEvent,
|
||||
obj: obj,
|
||||
}
|
||||
gb.graphChanges.Add(event)
|
||||
},
|
||||
},
|
||||
// don't need to clone because it's not from shared cache
|
||||
handlers,
|
||||
)
|
||||
return monitor, nil
|
||||
}
|
||||
@ -205,6 +226,9 @@ func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
|
||||
go monitor.Run(stopCh)
|
||||
}
|
||||
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
|
||||
|
||||
// set this so that we can use it if we need to start new shared informers
|
||||
gb.stopCh = stopCh
|
||||
}
|
||||
|
||||
var ignoredResources = map[schema.GroupVersionResource]struct{}{
|
||||
@ -435,12 +459,7 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||
return true
|
||||
}
|
||||
typeAccessor, err := meta.TypeAccessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||
return true
|
||||
}
|
||||
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, event type %v", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
|
||||
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
|
||||
// Check if the node already exsits
|
||||
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
|
||||
switch {
|
||||
@ -448,8 +467,8 @@ func (gb *GraphBuilder) processGraphChanges() bool {
|
||||
newNode := &node{
|
||||
identity: objectReference{
|
||||
OwnerReference: metav1.OwnerReference{
|
||||
APIVersion: typeAccessor.GetAPIVersion(),
|
||||
Kind: typeAccessor.GetKind(),
|
||||
APIVersion: event.gvk.GroupVersion().String(),
|
||||
Kind: event.gvk.Kind,
|
||||
UID: accessor.GetUID(),
|
||||
Name: accessor.GetName(),
|
||||
},
|
||||
|
@ -304,33 +304,8 @@ func ClusterRoles() []rbac.ClusterRole {
|
||||
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "secrets", "serviceaccounts").RuleOrDie(),
|
||||
// Needed to check API access. These creates are non-mutating
|
||||
rbac.NewRule("create").Groups(authenticationGroup).Resources("tokenreviews").RuleOrDie(),
|
||||
|
||||
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources(
|
||||
"configmaps",
|
||||
"namespaces",
|
||||
"nodes",
|
||||
"persistentvolumeclaims",
|
||||
"persistentvolumes",
|
||||
"pods",
|
||||
"replicationcontrollers",
|
||||
"resourcequotas",
|
||||
"secrets",
|
||||
"services",
|
||||
"serviceaccounts",
|
||||
).RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources(
|
||||
"daemonsets",
|
||||
"deployments",
|
||||
"podsecuritypolicies",
|
||||
"replicasets",
|
||||
).RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("deployments").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
|
||||
// Needed for all shared informers
|
||||
rbac.NewRule("list", "watch").Groups("*").Resources("*").RuleOrDie(),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -479,79 +479,9 @@ items:
|
||||
verbs:
|
||||
- create
|
||||
- apiGroups:
|
||||
- ""
|
||||
- '*'
|
||||
resources:
|
||||
- configmaps
|
||||
- namespaces
|
||||
- nodes
|
||||
- persistentvolumeclaims
|
||||
- persistentvolumes
|
||||
- pods
|
||||
- replicationcontrollers
|
||||
- resourcequotas
|
||||
- secrets
|
||||
- serviceaccounts
|
||||
- services
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- extensions
|
||||
resources:
|
||||
- daemonsets
|
||||
- deployments
|
||||
- podsecuritypolicies
|
||||
- replicasets
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- apps
|
||||
resources:
|
||||
- deployments
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- batch
|
||||
resources:
|
||||
- cronjobs
|
||||
- jobs
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- apps
|
||||
resources:
|
||||
- statefulsets
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- policy
|
||||
resources:
|
||||
- poddisruptionbudgets
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- autoscaling
|
||||
resources:
|
||||
- horizontalpodautoscalers
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- certificates.k8s.io
|
||||
resources:
|
||||
- certificatesigningrequests
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- storage.k8s.io
|
||||
resources:
|
||||
- storageclasses
|
||||
- '*'
|
||||
verbs:
|
||||
- list
|
||||
- watch
|
||||
|
@ -18,6 +18,7 @@ go_test(
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/controller/garbagecollector:go_default_library",
|
||||
"//pkg/controller/garbagecollector/metaonly:go_default_library",
|
||||
"//test/integration:go_default_library",
|
||||
|
@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
"k8s.io/kubernetes/test/integration"
|
||||
@ -123,7 +124,7 @@ func newOwnerRC(name, namespace string) *v1.ReplicationController {
|
||||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) {
|
||||
func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) {
|
||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||
masterConfig.EnableCoreControllers = false
|
||||
_, s, closeFn := framework.RunAMaster(masterConfig)
|
||||
@ -146,19 +147,30 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *garbagecollect
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources)
|
||||
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), deletableGroupVersionResources, sharedInformers)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create garbage collector")
|
||||
}
|
||||
|
||||
go sharedInformers.Start(stop)
|
||||
|
||||
return s, closeFn, gc, clientSet
|
||||
}
|
||||
|
||||
// This test simulates the cascading deletion.
|
||||
func TestCascadingDeletion(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
glog.V(6).Infof("TestCascadingDeletion starts")
|
||||
defer glog.V(6).Infof("TestCascadingDeletion ends")
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-cascading-deletion", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -215,9 +227,7 @@ func TestCascadingDeletion(t *testing.T) {
|
||||
if len(pods.Items) != 3 {
|
||||
t.Fatalf("Expect only 3 pods")
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
// delete one of the replication controller
|
||||
if err := rcClient.Delete(toBeDeletedRCName, getNonOrphanOptions()); err != nil {
|
||||
t.Fatalf("failed to delete replication controller: %v", err)
|
||||
@ -245,8 +255,14 @@ func TestCascadingDeletion(t *testing.T) {
|
||||
// This test simulates the case where an object is created with an owner that
|
||||
// doesn't exist. It verifies the GC will delete such an object.
|
||||
func TestCreateWithNonExistentOwner(t *testing.T) {
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-non-existing-owner", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -267,9 +283,7 @@ func TestCreateWithNonExistentOwner(t *testing.T) {
|
||||
if len(pods.Items) != 1 {
|
||||
t.Fatalf("Expect only 1 pod")
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
// wait for the garbage collector to delete the pod
|
||||
if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 5*time.Second, 30*time.Second); err != nil {
|
||||
t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err)
|
||||
@ -341,15 +355,20 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
|
||||
// e2e tests that put more stress.
|
||||
func TestStressingCascadingDeletion(t *testing.T) {
|
||||
t.Logf("starts garbage collector stress test")
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-stressing-cascading-deletion", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
const collections = 10
|
||||
var wg sync.WaitGroup
|
||||
@ -402,8 +421,15 @@ func TestStressingCascadingDeletion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOrphaning(t *testing.T) {
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-orphaning", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -429,9 +455,7 @@ func TestOrphaning(t *testing.T) {
|
||||
}
|
||||
podUIDs = append(podUIDs, pod.ObjectMeta.UID)
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
// we need wait for the gc to observe the creation of the pods, otherwise if
|
||||
// the deletion of RC is observed before the creation of the pods, the pods
|
||||
@ -473,8 +497,15 @@ func TestOrphaning(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground1", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -500,9 +531,7 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
|
||||
if err != nil {
|
||||
@ -535,8 +564,15 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground2", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -570,9 +606,7 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions())
|
||||
if err != nil {
|
||||
@ -603,8 +637,15 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
||||
s, closeFn, gc, clientSet := setup(t)
|
||||
defer closeFn()
|
||||
stopCh := make(chan struct{})
|
||||
s, closeFn, gc, clientSet := setup(t, stopCh)
|
||||
|
||||
defer func() {
|
||||
// We have to close the stop channel first, so the shared informers can terminate their watches;
|
||||
// otherwise closeFn() will hang waiting for active client connections to finish.
|
||||
close(stopCh)
|
||||
closeFn()
|
||||
}()
|
||||
|
||||
ns := framework.CreateTestingNamespace("gc-foreground3", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
@ -627,9 +668,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
// this makes sure the garbage collector will have added the pod to its
|
||||
// dependency graph before handling the foreground deletion of the rc.
|
||||
|
Loading…
Reference in New Issue
Block a user