From e8b1d7dc24713db99808028e0d02bacf6d48e01f Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 24 Aug 2023 19:34:03 +0000 Subject: [PATCH] garbagecollector: controller loop should not be blocking on failed cache sync Signed-off-by: Andrew Sy Kim Co-authored-by: He Xiaoxi --- .../garbagecollector/garbagecollector.go | 129 ++++++------------ .../garbagecollector/garbagecollector_test.go | 108 +++++++-------- 2 files changed, 98 insertions(+), 139 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 9874de251a0..b1c604919ba 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -74,8 +74,6 @@ type GarbageCollector struct { kubeClient clientset.Interface eventBroadcaster record.EventBroadcaster - - workerLock sync.RWMutex } var _ controller.Interface = (*GarbageCollector)(nil) @@ -148,13 +146,15 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { go gc.dependencyGraphBuilder.Run(ctx) - if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool { + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), 30*time.Second), func() bool { return gc.dependencyGraphBuilder.IsSynced(logger) }) { - return + logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways") + } else { + logger.Info("Garbage collector: all resource monitors have synced") } - logger.Info("All resource monitors have synced. Proceeding to collect garbage") + logger.Info("Proceeding to collect garbage") // gc workers for i := 0; i < workers; i++ { @@ -166,8 +166,8 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { } // Sync periodically resyncs the garbage collector when new resources are -// observed from discovery. When new resources are detected, Sync will stop all -// GC workers, reset gc.restMapper, and resync the monitors. +// observed from discovery. When new resources are detected, it will reset +// gc.restMapper, and resync the monitors. // // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise // the mapper's underlying discovery client will be unnecessarily reset during @@ -200,83 +200,48 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. return } - // Ensure workers are paused to avoid processing events before informers - // have resynced. - gc.workerLock.Lock() - defer gc.workerLock.Unlock() + logger.V(2).Info( + "syncing garbage collector with updated resources from discovery", + "diff", printDiff(oldResources, newResources), + ) - // Once we get here, we should not unpause workers until we've successfully synced - attempt := 0 - wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) { - attempt++ + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + gc.restMapper.Reset() + logger.V(4).Info("reset restmapper") - // On a reattempt, check if available resources have changed - if attempt > 1 { - newResources, err = GetDeletableResources(logger, discoveryClient) + // Perform the monitor resync and wait for controllers to report cache sync. + // + // NOTE: It's possible that newResources will diverge from the resources + // discovered by restMapper during the call to Reset, since they are + // distinct discovery clients invalidated at different times. For example, + // newResources may contain resources not returned in the restMapper's + // discovery call if the resources appeared in-between the calls. In that + // case, the restMapper will fail to map some of newResources until the next + // attempt. + if err := gc.resyncMonitors(logger, newResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err)) + metrics.GarbageCollectorResourcesSyncError.Inc() + return + } + logger.V(4).Info("resynced monitors") - if len(newResources) == 0 { - logger.V(2).Info("no resources reported by discovery", "attempt", attempt) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { - // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources - for k, v := range oldResources { - if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { - newResources[k] = v - } - } - } - } + // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. + // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. + // informers keep attempting to sync in the background, so retrying doesn't interrupt them. + // the call to resyncMonitors on the reattempt will no-op for resources that still exist. + // note that workers stay paused until we successfully resync. + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { + return gc.dependencyGraphBuilder.IsSynced(logger) + }) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) + metrics.GarbageCollectorResourcesSyncError.Inc() + } - logger.V(2).Info( - "syncing garbage collector with updated resources from discovery", - "attempt", attempt, - "diff", printDiff(oldResources, newResources), - ) - - // Resetting the REST mapper will also invalidate the underlying discovery - // client. This is a leaky abstraction and assumes behavior about the REST - // mapper, but we'll deal with it for now. - gc.restMapper.Reset() - logger.V(4).Info("reset restmapper") - - // Perform the monitor resync and wait for controllers to report cache sync. - // - // NOTE: It's possible that newResources will diverge from the resources - // discovered by restMapper during the call to Reset, since they are - // distinct discovery clients invalidated at different times. For example, - // newResources may contain resources not returned in the restMapper's - // discovery call if the resources appeared in-between the calls. In that - // case, the restMapper will fail to map some of newResources until the next - // attempt. - if err := gc.resyncMonitors(logger, newResources); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err)) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - logger.V(4).Info("resynced monitors") - - // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. - // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. - // informers keep attempting to sync in the background, so retrying doesn't interrupt them. - // the call to resyncMonitors on the reattempt will no-op for resources that still exist. - // note that workers stay paused until we successfully resync. - if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { - return gc.dependencyGraphBuilder.IsSynced(logger) - }) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - - // success, break out of the loop - return true, nil - }) - - // Finally, keep track of our new state. Do this after all preceding steps - // have succeeded to ensure we'll retry on subsequent syncs if an error - // occurred. + // Finally, keep track of our new resource monitor state. + // Monitors where the cache sync times out are still tracked here as + // subsequent runs should stop them if their resources were removed. oldResources = newResources logger.V(2).Info("synced garbage collector") }, period) @@ -328,8 +293,6 @@ var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objec func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool { item, quit := gc.attemptToDelete.Get() - gc.workerLock.RLock() - defer gc.workerLock.RUnlock() if quit { return false } @@ -754,8 +717,6 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) { // these steps fail. func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool { item, quit := gc.attemptToOrphan.Get() - gc.workerLock.RLock() - defer gc.workerLock.RUnlock() if quit { return false } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 2439fb732d4..c675b27c37d 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -814,7 +814,8 @@ func TestGetDeletableResources(t *testing.T) { } // TestGarbageCollectorSync ensures that a discovery client error -// will not cause the garbage collector to block infinitely. +// or an informer sync error will not cause the garbage collector +// to block infinitely. func TestGarbageCollectorSync(t *testing.T) { serverResources := []*metav1.APIResourceList{ { @@ -912,49 +913,30 @@ func TestGarbageCollectorSync(t *testing.T) { // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) - } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning an error fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) - - // Wait until sync discovers the change time.Sleep(1 * time.Second) - // No monitor changes assertMonitors(t, gc, "pods", "deployments") // Remove the error from being returned and see if the garbage collector sync is still working fakeDiscoveryClient.setPreferredResources(serverResources, nil) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } + time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) - - // Wait until sync discovers the change time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") // Put the resources back to normal and ensure garbage collector sync recovers fakeDiscoveryClient.setPreferredResources(serverResources, nil) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } + time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "deployments") // Partial discovery failure fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) - // Wait until sync discovers the change time.Sleep(1 * time.Second) // Deployments monitor kept assertMonitors(t, gc, "pods", "deployments", "secrets") @@ -963,12 +945,35 @@ func TestGarbageCollectorSync(t *testing.T) { fakeDiscoveryClient.setPreferredResources(serverResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } // Unsyncable monitor removed assertMonitors(t, gc, "pods", "deployments") + + // Add fake controller simulate the initial not-synced informer which will be synced at the end. + fc := fakeController{} + gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{ + Version: "v1", + Resource: "secrets", + }] = &monitor{controller: &fc} + if gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should not be synced") + } + + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "secrets") + + // The informer is synced now. + fc.SetSynced(true) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "secrets") + + if !gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should be synced") + } + + fakeDiscoveryClient.setPreferredResources(serverResources, nil) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "deployments") } func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { @@ -983,29 +988,6 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { } } -func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { - before := fakeDiscoveryClient.getInterfaceUsedCount() - t := 1 * time.Second - time.Sleep(t) - after := fakeDiscoveryClient.getInterfaceUsedCount() - if before == after { - return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) - } - - workerLockAcquired := make(chan struct{}) - go func() { - workerLock.Lock() - defer workerLock.Unlock() - close(workerLockAcquired) - }() - select { - case <-workerLockAcquired: - return nil - case <-time.After(t): - return fmt.Errorf("workerLock blocked for at least %v", t) - } -} - type fakeServerResources struct { PreferredResources []*metav1.APIResourceList Error error @@ -1035,12 +1017,6 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou f.Error = err } -func (f *fakeServerResources) getInterfaceUsedCount() int { - f.Lock.Lock() - defer f.Lock.Unlock() - return f.InterfaceUsedCount -} - func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { return nil, nil } @@ -2778,6 +2754,28 @@ func assertState(s state) step { } +type fakeController struct { + synced bool + lock sync.Mutex +} + +func (f *fakeController) Run(stopCh <-chan struct{}) { +} + +func (f *fakeController) HasSynced() bool { + return f.synced +} + +func (f *fakeController) SetSynced(synced bool) { + f.lock.Lock() + defer f.lock.Unlock() + f.synced = synced +} + +func (f *fakeController) LastSyncResourceVersion() string { + return "" +} + // trackingWorkqueue implements RateLimitingInterface, // allows introspection of the items in the queue, // and treats AddAfter and AddRateLimited the same as Add