From c9a084d59ca40f42e37fc1d9b9dcd1e8bfbda0da Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Fri, 12 May 2023 16:24:10 -0400 Subject: [PATCH] Fix duplicate GC event handlers getting added if discovery flutters --- .../garbagecollector/garbagecollector.go | 41 +++++++--- .../garbagecollector/garbagecollector_test.go | 79 +++++++++++++++---- .../garbagecollector/graph_builder.go | 8 ++ 3 files changed, 101 insertions(+), 27 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 82fd8d208ce..1a4aeecb834 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -187,14 +187,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. logger := klog.FromContext(ctx) // Get the current resource list from discovery. - newResources := GetDeletableResources(logger, discoveryClient) + newResources, err := GetDeletableResources(logger, discoveryClient) - // This can occur if there is an internal error in GetDeletableResources. if len(newResources) == 0 { logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync") metrics.GarbageCollectorResourcesSyncError.Inc() return } + if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { + // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources + for k, v := range oldResources { + if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { + newResources[k] = v + } + } + } // Decide whether discovery has reported a change. if reflect.DeepEqual(oldResources, newResources) { @@ -214,12 +221,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. // On a reattempt, check if available resources have changed if attempt > 1 { - newResources = GetDeletableResources(logger, discoveryClient) + newResources, err = GetDeletableResources(logger, discoveryClient) + if len(newResources) == 0 { logger.V(2).Info("no resources reported by discovery", "attempt", attempt) metrics.GarbageCollectorResourcesSyncError.Inc() return false, nil } + if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { + // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources + for k, v := range oldResources { + if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { + newResources[k] = v + } + } + } } logger.V(2).Info( @@ -806,20 +822,23 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool { // garbage collector should recognize and work with. More specifically, all // preferred resources which support the 'delete', 'list', and 'watch' verbs. // +// If an error was encountered fetching resources from the server, +// it is included as well, along with any resources that were successfully resolved. +// // All discovery errors are considered temporary. Upon encountering any error, // GetDeletableResources will log and return any discovered resources it was // able to process (which may be none). -func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { - preferredResources, err := discoveryClient.ServerPreferredResources() - if err != nil { - if discovery.IsGroupDiscoveryFailedError(err) { - logger.Info("failed to discover some groups", "groups", err.(*discovery.ErrGroupDiscoveryFailed).Groups) +func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) (map[schema.GroupVersionResource]struct{}, error) { + preferredResources, lookupErr := discoveryClient.ServerPreferredResources() + if lookupErr != nil { + if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(lookupErr); isLookupFailure { + logger.Info("failed to discover some groups", "groups", groupLookupFailures) } else { - logger.Info("failed to discover preferred resources", "error", err) + logger.Info("failed to discover preferred resources", "error", lookupErr) } } if preferredResources == nil { - return map[schema.GroupVersionResource]struct{}{} + return map[schema.GroupVersionResource]struct{}{}, lookupErr } // This is extracted from discovery.GroupVersionResources to allow tolerating @@ -837,7 +856,7 @@ func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerR } } - return deletableGroupVersionResources + return deletableGroupVersionResources, lookupErr } func (gc *GarbageCollector) Name() string { diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index a11d54b6634..4455e7cd853 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -28,6 +28,7 @@ import ( "time" "golang.org/x/time/rate" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" @@ -805,10 +806,13 @@ func TestGetDeletableResources(t *testing.T) { PreferredResources: test.serverResources, Error: test.err, } - actual := GetDeletableResources(logger, client) + actual, actualErr := GetDeletableResources(logger, client) if !reflect.DeepEqual(test.deletableResources, actual) { t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual) } + if !reflect.DeepEqual(test.err, actualErr) { + t.Errorf("expected error:\n%v\ngot:\n%v", test.err, actualErr) + } } } @@ -822,7 +826,15 @@ func TestGarbageCollectorSync(t *testing.T) { {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}}, }, }, + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"delete", "list", "watch"}}, + }, + }, } + appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}} + unsyncableServerResources := []*metav1.APIResourceList{ { GroupVersion: "v1", @@ -845,6 +857,10 @@ func TestGarbageCollectorSync(t *testing.T) { 200, []byte("{}"), }, + "GET" + "/apis/apps/v1/deployments": { + 200, + []byte("{}"), + }, "GET" + "/api/v1/secrets": { 404, []byte("{}"), @@ -859,7 +875,11 @@ func TestGarbageCollectorSync(t *testing.T) { t.Fatal(err) } - rm := &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)} + tweakableRM := meta.NewDefaultRESTMapper(nil) + tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, schema.GroupVersionResource{Version: "v1", Resource: "pods"}, schema.GroupVersionResource{Version: "v1", Resource: "pod"}, meta.RESTScopeNamespace) + tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Secret"}, schema.GroupVersionResource{Version: "v1", Resource: "secrets"}, schema.GroupVersionResource{Version: "v1", Resource: "secret"}, meta.RESTScopeNamespace) + tweakableRM.AddSpecific(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployment"}, meta.RESTScopeNamespace) + rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)}} metadataClient, err := metadata.NewForConfig(clientConfig) if err != nil { t.Fatal(err) @@ -900,38 +920,70 @@ func TestGarbageCollectorSync(t *testing.T) { if err != nil { t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) } + assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning an error - fakeDiscoveryClient.setPreferredResources(nil) - fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) + fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) // Wait until sync discovers the change time.Sleep(1 * time.Second) + // No monitor changes + assertMonitors(t, gc, "pods", "deployments") // Remove the error from being returned and see if the garbage collector sync is still working - fakeDiscoveryClient.setPreferredResources(serverResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(serverResources, nil) err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) if err != nil { t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) } + assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches - fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "secrets") // Put the resources back to normal and ensure garbage collector sync recovers - fakeDiscoveryClient.setPreferredResources(serverResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(serverResources, nil) err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) if err != nil { t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) } + assertMonitors(t, gc, "pods", "deployments") + + // Partial discovery failure + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + // Deployments monitor kept + assertMonitors(t, gc, "pods", "deployments", "secrets") + + // Put the resources back to normal and ensure garbage collector sync recovers + fakeDiscoveryClient.setPreferredResources(serverResources, nil) + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } + // Unsyncable monitor removed + assertMonitors(t, gc, "pods", "deployments") +} + +func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { + t.Helper() + expected := sets.NewString(resources...) + actual := sets.NewString() + for m := range gc.dependencyGraphBuilder.monitors { + actual.Insert(m.Resource) + } + if !actual.Equal(expected) { + t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List()) + } } func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { @@ -979,15 +1031,10 @@ func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceL return f.PreferredResources, f.Error } -func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { +func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) { f.Lock.Lock() defer f.Lock.Unlock() f.PreferredResources = resources -} - -func (f *fakeServerResources) setError(err error) { - f.Lock.Lock() - defer f.Lock.Unlock() f.Error = err } diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index ef9ebe42c0b..9648664ed2b 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -265,6 +265,14 @@ func (gb *GraphBuilder) startMonitors(logger klog.Logger) { logger.V(4).Info("started new monitors", "new", started, "current", len(monitors)) } +// IsResourceSynced returns true if a monitor exists for the given resource and has synced +func (gb *GraphBuilder) IsResourceSynced(resource schema.GroupVersionResource) bool { + gb.monitorLock.Lock() + defer gb.monitorLock.Unlock() + monitor, ok := gb.monitors[resource] + return ok && monitor.controller.HasSynced() +} + // IsSynced returns true if any monitors exist AND all those monitors' // controllers HasSynced functions return true. This means IsSynced could return // true at one time, and then later return false if all monitors were