From e8b1d7dc24713db99808028e0d02bacf6d48e01f Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 24 Aug 2023 19:34:03 +0000 Subject: [PATCH 1/4] garbagecollector: controller loop should not be blocking on failed cache sync Signed-off-by: Andrew Sy Kim Co-authored-by: He Xiaoxi --- .../garbagecollector/garbagecollector.go | 129 ++++++------------ .../garbagecollector/garbagecollector_test.go | 108 +++++++-------- 2 files changed, 98 insertions(+), 139 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 9874de251a0..b1c604919ba 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -74,8 +74,6 @@ type GarbageCollector struct { kubeClient clientset.Interface eventBroadcaster record.EventBroadcaster - - workerLock sync.RWMutex } var _ controller.Interface = (*GarbageCollector)(nil) @@ -148,13 +146,15 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { go gc.dependencyGraphBuilder.Run(ctx) - if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool { + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), 30*time.Second), func() bool { return gc.dependencyGraphBuilder.IsSynced(logger) }) { - return + logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways") + } else { + logger.Info("Garbage collector: all resource monitors have synced") } - logger.Info("All resource monitors have synced. Proceeding to collect garbage") + logger.Info("Proceeding to collect garbage") // gc workers for i := 0; i < workers; i++ { @@ -166,8 +166,8 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { } // Sync periodically resyncs the garbage collector when new resources are -// observed from discovery. When new resources are detected, Sync will stop all -// GC workers, reset gc.restMapper, and resync the monitors. +// observed from discovery. When new resources are detected, it will reset +// gc.restMapper, and resync the monitors. // // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise // the mapper's underlying discovery client will be unnecessarily reset during @@ -200,83 +200,48 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. return } - // Ensure workers are paused to avoid processing events before informers - // have resynced. - gc.workerLock.Lock() - defer gc.workerLock.Unlock() + logger.V(2).Info( + "syncing garbage collector with updated resources from discovery", + "diff", printDiff(oldResources, newResources), + ) - // Once we get here, we should not unpause workers until we've successfully synced - attempt := 0 - wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) { - attempt++ + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + gc.restMapper.Reset() + logger.V(4).Info("reset restmapper") - // On a reattempt, check if available resources have changed - if attempt > 1 { - newResources, err = GetDeletableResources(logger, discoveryClient) + // Perform the monitor resync and wait for controllers to report cache sync. + // + // NOTE: It's possible that newResources will diverge from the resources + // discovered by restMapper during the call to Reset, since they are + // distinct discovery clients invalidated at different times. For example, + // newResources may contain resources not returned in the restMapper's + // discovery call if the resources appeared in-between the calls. In that + // case, the restMapper will fail to map some of newResources until the next + // attempt. + if err := gc.resyncMonitors(logger, newResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err)) + metrics.GarbageCollectorResourcesSyncError.Inc() + return + } + logger.V(4).Info("resynced monitors") - if len(newResources) == 0 { - logger.V(2).Info("no resources reported by discovery", "attempt", attempt) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { - // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources - for k, v := range oldResources { - if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { - newResources[k] = v - } - } - } - } + // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. + // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. + // informers keep attempting to sync in the background, so retrying doesn't interrupt them. + // the call to resyncMonitors on the reattempt will no-op for resources that still exist. + // note that workers stay paused until we successfully resync. + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { + return gc.dependencyGraphBuilder.IsSynced(logger) + }) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) + metrics.GarbageCollectorResourcesSyncError.Inc() + } - logger.V(2).Info( - "syncing garbage collector with updated resources from discovery", - "attempt", attempt, - "diff", printDiff(oldResources, newResources), - ) - - // Resetting the REST mapper will also invalidate the underlying discovery - // client. This is a leaky abstraction and assumes behavior about the REST - // mapper, but we'll deal with it for now. - gc.restMapper.Reset() - logger.V(4).Info("reset restmapper") - - // Perform the monitor resync and wait for controllers to report cache sync. - // - // NOTE: It's possible that newResources will diverge from the resources - // discovered by restMapper during the call to Reset, since they are - // distinct discovery clients invalidated at different times. For example, - // newResources may contain resources not returned in the restMapper's - // discovery call if the resources appeared in-between the calls. In that - // case, the restMapper will fail to map some of newResources until the next - // attempt. - if err := gc.resyncMonitors(logger, newResources); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err)) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - logger.V(4).Info("resynced monitors") - - // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. - // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. - // informers keep attempting to sync in the background, so retrying doesn't interrupt them. - // the call to resyncMonitors on the reattempt will no-op for resources that still exist. - // note that workers stay paused until we successfully resync. - if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { - return gc.dependencyGraphBuilder.IsSynced(logger) - }) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) - metrics.GarbageCollectorResourcesSyncError.Inc() - return false, nil - } - - // success, break out of the loop - return true, nil - }) - - // Finally, keep track of our new state. Do this after all preceding steps - // have succeeded to ensure we'll retry on subsequent syncs if an error - // occurred. + // Finally, keep track of our new resource monitor state. + // Monitors where the cache sync times out are still tracked here as + // subsequent runs should stop them if their resources were removed. oldResources = newResources logger.V(2).Info("synced garbage collector") }, period) @@ -328,8 +293,6 @@ var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objec func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool { item, quit := gc.attemptToDelete.Get() - gc.workerLock.RLock() - defer gc.workerLock.RUnlock() if quit { return false } @@ -754,8 +717,6 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) { // these steps fail. func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool { item, quit := gc.attemptToOrphan.Get() - gc.workerLock.RLock() - defer gc.workerLock.RUnlock() if quit { return false } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 2439fb732d4..c675b27c37d 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -814,7 +814,8 @@ func TestGetDeletableResources(t *testing.T) { } // TestGarbageCollectorSync ensures that a discovery client error -// will not cause the garbage collector to block infinitely. +// or an informer sync error will not cause the garbage collector +// to block infinitely. func TestGarbageCollectorSync(t *testing.T) { serverResources := []*metav1.APIResourceList{ { @@ -912,49 +913,30 @@ func TestGarbageCollectorSync(t *testing.T) { // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) - } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning an error fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) - - // Wait until sync discovers the change time.Sleep(1 * time.Second) - // No monitor changes assertMonitors(t, gc, "pods", "deployments") // Remove the error from being returned and see if the garbage collector sync is still working fakeDiscoveryClient.setPreferredResources(serverResources, nil) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } + time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) - - // Wait until sync discovers the change time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") // Put the resources back to normal and ensure garbage collector sync recovers fakeDiscoveryClient.setPreferredResources(serverResources, nil) - - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } + time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "deployments") // Partial discovery failure fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) - // Wait until sync discovers the change time.Sleep(1 * time.Second) // Deployments monitor kept assertMonitors(t, gc, "pods", "deployments", "secrets") @@ -963,12 +945,35 @@ func TestGarbageCollectorSync(t *testing.T) { fakeDiscoveryClient.setPreferredResources(serverResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) - err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) - if err != nil { - t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) - } // Unsyncable monitor removed assertMonitors(t, gc, "pods", "deployments") + + // Add fake controller simulate the initial not-synced informer which will be synced at the end. + fc := fakeController{} + gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{ + Version: "v1", + Resource: "secrets", + }] = &monitor{controller: &fc} + if gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should not be synced") + } + + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "secrets") + + // The informer is synced now. + fc.SetSynced(true) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "secrets") + + if !gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should be synced") + } + + fakeDiscoveryClient.setPreferredResources(serverResources, nil) + time.Sleep(1 * time.Second) + assertMonitors(t, gc, "pods", "deployments") } func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { @@ -983,29 +988,6 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { } } -func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { - before := fakeDiscoveryClient.getInterfaceUsedCount() - t := 1 * time.Second - time.Sleep(t) - after := fakeDiscoveryClient.getInterfaceUsedCount() - if before == after { - return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) - } - - workerLockAcquired := make(chan struct{}) - go func() { - workerLock.Lock() - defer workerLock.Unlock() - close(workerLockAcquired) - }() - select { - case <-workerLockAcquired: - return nil - case <-time.After(t): - return fmt.Errorf("workerLock blocked for at least %v", t) - } -} - type fakeServerResources struct { PreferredResources []*metav1.APIResourceList Error error @@ -1035,12 +1017,6 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou f.Error = err } -func (f *fakeServerResources) getInterfaceUsedCount() int { - f.Lock.Lock() - defer f.Lock.Unlock() - return f.InterfaceUsedCount -} - func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { return nil, nil } @@ -2778,6 +2754,28 @@ func assertState(s state) step { } +type fakeController struct { + synced bool + lock sync.Mutex +} + +func (f *fakeController) Run(stopCh <-chan struct{}) { +} + +func (f *fakeController) HasSynced() bool { + return f.synced +} + +func (f *fakeController) SetSynced(synced bool) { + f.lock.Lock() + defer f.lock.Unlock() + f.synced = synced +} + +func (f *fakeController) LastSyncResourceVersion() string { + return "" +} + // trackingWorkqueue implements RateLimitingInterface, // allows introspection of the items in the queue, // and treats AddAfter and AddRateLimited the same as Add From d4fdfaf17d90c0252bf465a334be6a759f652060 Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Fri, 25 Aug 2023 17:31:25 +0000 Subject: [PATCH 2/4] test/integration/garbagecollector: add test TestCascadingDeleteOnCRDConversionFailure which tests that GC controller cannot be blocked by a bad conversion webhook Signed-off-by: haorenfsa Co-authored-by: Andrew Sy Kim --- .../garbagecollector/garbagecollector_test.go | 128 ++++++++++------ .../garbage_collector_test.go | 140 ++++++++++++++++++ 2 files changed, 220 insertions(+), 48 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index c675b27c37d..174bd838071 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -24,6 +24,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -49,6 +50,7 @@ import ( "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -60,9 +62,11 @@ import ( clientgotesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + metricsutil "k8s.io/component-base/metrics/testutil" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/kubernetes/pkg/api/legacyscheme" c "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/garbagecollector/metrics" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -846,7 +850,6 @@ func TestGarbageCollectorSync(t *testing.T) { PreferredResources: serverResources, Error: nil, Lock: sync.Mutex{}, - InterfaceUsedCount: 0, } testHandler := &fakeActionHandler{ @@ -865,7 +868,24 @@ func TestGarbageCollectorSync(t *testing.T) { }, }, } - srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + + testHandler2 := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/secrets": { + 200, + []byte("{}"), + }, + }, + } + var secretSyncOK atomic.Bool + var alternativeTestHandler = func(response http.ResponseWriter, request *http.Request) { + if request.URL.Path == "/api/v1/secrets" && secretSyncOK.Load() { + testHandler2.ServeHTTP(response, request) + return + } + testHandler.ServeHTTP(response, request) + } + srv, clientConfig := testServerAndClientConfig(alternativeTestHandler) defer srv.Close() clientConfig.ContentConfig.NegotiatedSerializer = nil client, err := kubernetes.NewForConfig(clientConfig) @@ -885,7 +905,7 @@ func TestGarbageCollectorSync(t *testing.T) { sharedInformers := informers.NewSharedInformerFactory(client, 0) - tCtx := ktesting.Init(t) + logger, tCtx := ktesting.NewTestContext(t) defer tCtx.Cancel("test has completed") alwaysStarted := make(chan struct{}) close(alwaysStarted) @@ -913,30 +933,49 @@ func TestGarbageCollectorSync(t *testing.T) { // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning an error fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) + + // Wait until sync discovers the change time.Sleep(1 * time.Second) + // No monitor changes assertMonitors(t, gc, "pods", "deployments") // Remove the error from being returned and see if the garbage collector sync is still working fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) + + // Wait until sync discovers the change time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") // Put the resources back to normal and ensure garbage collector sync recovers fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Partial discovery failure fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) + // Wait until sync discovers the change time.Sleep(1 * time.Second) // Deployments monitor kept assertMonitors(t, gc, "pods", "deployments", "secrets") @@ -945,35 +984,33 @@ func TestGarbageCollectorSync(t *testing.T) { fakeDiscoveryClient.setPreferredResources(serverResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } // Unsyncable monitor removed assertMonitors(t, gc, "pods", "deployments") - // Add fake controller simulate the initial not-synced informer which will be synced at the end. - fc := fakeController{} - gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{ - Version: "v1", - Resource: "secrets", - }] = &monitor{controller: &fc} - if gc.IsSynced(logger) { - t.Fatal("cache from garbage collector should not be synced") - } - + // Simulate initial not-synced informer which will be synced at the end. + metrics.GarbageCollectorResourcesSyncError.Reset() fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") - - // The informer is synced now. - fc.SetSynced(true) - time.Sleep(1 * time.Second) - assertMonitors(t, gc, "pods", "secrets") - - if !gc.IsSynced(logger) { - t.Fatal("cache from garbage collector should be synced") + if gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should not be synced") + } + val, _ := metricsutil.GetCounterMetricValue(metrics.GarbageCollectorResourcesSyncError) + if val < 1 { + t.Fatalf("expect sync error metric > 0") } - fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) - assertMonitors(t, gc, "pods", "deployments") + // The informer is synced now. + secretSyncOK.Store(true) + if err := wait.PollUntilContextTimeout(tCtx, time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + return gc.IsSynced(logger), nil + }); err != nil { + t.Fatal(err) + } } func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { @@ -988,6 +1025,17 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { } } +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error { + before := fakeDiscoveryClient.getInterfaceUsedCount() + t := 1 * time.Second + time.Sleep(t) + after := fakeDiscoveryClient.getInterfaceUsedCount() + if before == after { + return fmt.Errorf("discoveryClient.ServerPreferredResources() not called over %v", t) + } + return nil +} + type fakeServerResources struct { PreferredResources []*metav1.APIResourceList Error error @@ -1017,6 +1065,12 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou f.Error = err } +func (f *fakeServerResources) getInterfaceUsedCount() int { + f.Lock.Lock() + defer f.Lock.Unlock() + return f.InterfaceUsedCount +} + func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { return nil, nil } @@ -2754,28 +2808,6 @@ func assertState(s state) step { } -type fakeController struct { - synced bool - lock sync.Mutex -} - -func (f *fakeController) Run(stopCh <-chan struct{}) { -} - -func (f *fakeController) HasSynced() bool { - return f.synced -} - -func (f *fakeController) SetSynced(synced bool) { - f.lock.Lock() - defer f.lock.Unlock() - f.synced = synced -} - -func (f *fakeController) LastSyncResourceVersion() string { - return "" -} - // trackingWorkqueue implements RateLimitingInterface, // allows introspection of the items in the queue, // and treats AddAfter and AddRateLimited the same as Add diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 0c00b2dad39..9e62cc50133 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -81,6 +81,29 @@ const oneValidOwnerPodName = "test.pod.3" const toBeDeletedRCName = "test.rc.1" const remainingRCName = "test.rc.2" +// testCert was generated from crypto/tls/generate_cert.go with the following command: +// +// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var testCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDGDCCAgCgAwIBAgIQTKCKn99d5HhQVCLln2Q+eTANBgkqhkiG9w0BAQsFADAS +MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw +MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA1Z5/aTwqY706M34tn60l8ZHkanWDl8mM1pYf4Q7qg3zA9XqWLX6S +4rTYDYCb4stEasC72lQnbEWHbthiQE76zubP8WOFHdvGR3mjAvHWz4FxvLOTheZ+ +3iDUrl6Aj9UIsYqzmpBJAoY4+vGGf+xHvuukHrVcFqR9ZuBdZuJ/HbbjUyuNr3X9 +erNIr5Ha17gVzf17SNbYgNrX9gbCeEB8Z9Ox7dVuJhLDkpF0T/B5Zld3BjyUVY/T +cukU4dTVp6isbWPvCMRCZCCOpb+qIhxEjJ0n6tnPt8nf9lvDl4SWMl6X1bH+2EFa +a8R06G0QI+XhwPyjXUyCR8QEOZPCR5wyqQIDAQABo2gwZjAOBgNVHQ8BAf8EBAMC +AqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEB/zAuBgNVHREE +JzAlggtleGFtcGxlLmNvbYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG +9w0BAQsFAAOCAQEAThqgJ/AFqaANsOp48lojDZfZBFxJQ3A4zfR/MgggUoQ9cP3V +rxuKAFWQjze1EZc7J9iO1WvH98lOGVNRY/t2VIrVoSsBiALP86Eew9WucP60tbv2 +8/zsBDSfEo9Wl+Q/gwdEh8dgciUKROvCm76EgAwPGicMAgRsxXgwXHhS5e8nnbIE +Ewaqvb5dY++6kh0Oz+adtNT5OqOwXTIRI67WuEe6/B3Z4LNVPQDIj7ZUJGNw8e6L +F4nkUthwlKx4yEJHZBRuFPnO7Z81jNKuwL276+mczRH7piI6z9uyMV/JbEsOIxyL +W6CzB7pZ9Nj1YLpgzc1r6oONHLokMJJIz/IvkQ== +-----END CERTIFICATE-----`) + func newPod(podName, podNamespace string, ownerReferences []metav1.OwnerReference) *v1.Pod { for i := 0; i < len(ownerReferences); i++ { if len(ownerReferences[i].Kind) == 0 { @@ -252,6 +275,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work logger := tCtx.Logger() alwaysStarted := make(chan struct{}) close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( tCtx, clientSet, @@ -1285,3 +1309,119 @@ func testCRDDeletion(t *testing.T, ctx *testContext, ns *v1.Namespace, definitio t.Fatalf("failed waiting for dependent %q (owned by %q) to be deleted", dependent.GetName(), owner.GetName()) } } + +// TestCascadingDeleteOnCRDConversionFailure tests that a bad conversion webhook cannot block the entire GC controller. +// Historically, a cache sync failure from a single resource prevented GC controller from running. This test creates +// a CRD, updates the storage version with a bad conversion webhook and then runs a simple cascading delete test. +func TestCascadingDeleteOnCRDConversionFailure(t *testing.T) { + ctx := setup(t, 0) + defer ctx.tearDown() + gc, apiExtensionClient, dynamicClient, clientSet := ctx.gc, ctx.apiExtensionClient, ctx.dynamicClient, ctx.clientSet + + ns := createNamespaceOrDie("gc-cache-sync-fail", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) + + // Create a CRD with storage/serving version v1beta2. Then update the CRD with v1 as the storage version + // and an invalid conversion webhook. This should result in cache sync failures for the CRD from the GC controller. + def, dc := createRandomCustomResourceDefinition(t, apiExtensionClient, dynamicClient, ns.Name) + _, err := dc.Create(context.TODO(), newCRDInstance(def, ns.Name, names.SimpleNameGenerator.GenerateName("test")), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create custom resource: %v", err) + } + + def, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), def.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get custom resource: %v", err) + } + + newDefinition := def.DeepCopy() + newDefinition.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{ + Strategy: apiextensionsv1.WebhookConverter, + Webhook: &apiextensionsv1.WebhookConversion{ + ClientConfig: &apiextensionsv1.WebhookClientConfig{ + Service: &apiextensionsv1.ServiceReference{ + Name: "foobar", + Namespace: ns.Name, + }, + CABundle: testCert, + }, + ConversionReviewVersions: []string{ + "v1", "v1beta1", + }, + }, + } + newDefinition.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: apiextensionstestserver.AllowAllSchema(), + }, + { + Name: "v1beta1", + Served: true, + Storage: false, + Schema: apiextensionstestserver.AllowAllSchema(), + }, + } + + _, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), newDefinition, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating CRD with conversion webhook: %v", err) + } + + ctx.startGC(5) + + rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name) + podClient := clientSet.CoreV1().Pods(ns.Name) + + toBeDeletedRC, err := rcClient.Create(context.TODO(), newOwnerRC(toBeDeletedRCName, ns.Name), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + + rcs, err := rcClient.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list replication controllers: %v", err) + } + if len(rcs.Items) != 1 { + t.Fatalf("Expect only 1 replication controller") + } + + pod := newPod(garbageCollectedPodName, ns.Name, []metav1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}}) + _, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + pods, err := podClient.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + if len(pods.Items) != 1 { + t.Fatalf("Expect only 1 pods") + } + + if err := rcClient.Delete(context.TODO(), toBeDeletedRCName, getNonOrphanOptions()); err != nil { + t.Fatalf("failed to delete replication controller: %v", err) + } + + // sometimes the deletion of the RC takes long time to be observed by + // the gc, so wait for the garbage collector to observe the deletion of + // the toBeDeletedRC + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + return !gc.GraphHasUID(toBeDeletedRC.ObjectMeta.UID), nil + }); err != nil { + t.Fatal(err) + } + if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 1*time.Second, 30*time.Second); err != nil { + t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err) + } + + // Check that the cache is still not synced after cascading delete succeeded + // If this check passes, check that the conversion webhook is correctly misconfigured + // to prevent watch cache from listing the CRD. + if ctx.gc.IsSynced(ctx.logger) { + t.Fatal("cache is not expected to be synced due to bad conversion webhook") + } +} From 87ca4046340f233a2f2696103f30624ab366df3d Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Fri, 13 Sep 2024 23:03:47 +0800 Subject: [PATCH 3/4] garbagecollector: add initialSyncTimeout for Run Signed-off-by: haorenfsa --- cmd/kube-controller-manager/app/core.go | 5 +++-- pkg/controller/garbagecollector/garbagecollector.go | 4 ++-- pkg/controller/garbagecollector/garbagecollector_test.go | 7 ++++--- .../integration/garbagecollector/garbage_collector_test.go | 4 +++- test/integration/util/util.go | 2 +- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 41ea7470715..b3637f273af 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -698,11 +698,12 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont // Start the garbage collector. workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) - go garbageCollector.Run(ctx, workers) + const syncPeriod = 30 * time.Second + go garbageCollector.Run(ctx, workers, syncPeriod) // Periodically refresh the RESTMapper with new discovery information and sync // the garbage collector. - go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second) + go garbageCollector.Sync(ctx, discoveryClient, syncPeriod) return garbageCollector, true, nil } diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index b1c604919ba..e5e121c5dd9 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -129,7 +129,7 @@ func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResource } // Run starts garbage collector workers. -func (gc *GarbageCollector) Run(ctx context.Context, workers int) { +func (gc *GarbageCollector) Run(ctx context.Context, workers int, initialSyncTimeout time.Duration) { defer utilruntime.HandleCrash() defer gc.attemptToDelete.ShutDown() defer gc.attemptToOrphan.ShutDown() @@ -146,7 +146,7 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { go gc.dependencyGraphBuilder.Run(ctx) - if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), 30*time.Second), func() bool { + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), initialSyncTimeout), func() bool { return gc.dependencyGraphBuilder.IsSynced(logger) }) { logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways") diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 174bd838071..15e82a324ec 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -124,7 +124,7 @@ func TestGarbageCollectorConstruction(t *testing.T) { } assert.Len(t, gc.dependencyGraphBuilder.monitors, 1) - go gc.Run(tCtx, 1) + go gc.Run(tCtx, 1, 5*time.Second) err = gc.resyncMonitors(logger, twoResources) if err != nil { @@ -914,7 +914,8 @@ func TestGarbageCollectorSync(t *testing.T) { t.Fatal(err) } - go gc.Run(tCtx, 1) + syncPeriod := 200 * time.Millisecond + go gc.Run(tCtx, 1, syncPeriod) // The pseudo-code of GarbageCollector.Sync(): // GarbageCollector.Sync(client, period, stopCh): // wait.Until() loops with `period` until the `stopCh` is closed : @@ -929,7 +930,7 @@ func TestGarbageCollectorSync(t *testing.T) { // The 1s sleep in the test allows GetDeletableResources and // gc.resyncMonitors to run ~5 times to ensure the changes to the // fakeDiscoveryClient are picked up. - go gc.Sync(tCtx, fakeDiscoveryClient, 200*time.Millisecond) + go gc.Sync(tCtx, fakeDiscoveryClient, syncPeriod) // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 9e62cc50133..7202bbbaeb2 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -301,7 +301,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work // mapper, but we'll deal with it for now. restMapper.Reset() }, syncPeriod, tCtx.Done()) - go gc.Run(tCtx, workers) + go gc.Run(tCtx, workers, syncPeriod) go gc.Sync(tCtx, clientSet.Discovery(), syncPeriod) } @@ -1371,6 +1371,8 @@ func TestCascadingDeleteOnCRDConversionFailure(t *testing.T) { } ctx.startGC(5) + // make sure gc.Sync finds the new CRD and starts monitoring it + time.Sleep(ctx.syncPeriod + 1*time.Second) rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name) podClient := clientSet.CoreV1().Pods(ns.Name) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 9a028e0f36d..243ff5b1728 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -219,7 +219,7 @@ func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclie go wait.Until(func() { restMapper.Reset() }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) + go gc.Run(ctx, 1, syncPeriod) go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) } return startGC From da5f6cf05343aff91021acddd0cf299f4b6d74dc Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Fri, 13 Sep 2024 22:53:22 +0800 Subject: [PATCH 4/4] garbagecollector: fix logs & comments in gc sync Signed-off-by: haorenfsa --- .../garbagecollector/garbagecollector.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index e5e121c5dd9..1d78bd515b4 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -149,7 +149,7 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int, initialSyncTim if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), initialSyncTimeout), func() bool { return gc.dependencyGraphBuilder.IsSynced(logger) }) { - logger.Info("Garbage collector: all resource monitors could not be synced, proceeding anyways") + logger.Info("Garbage collector: not all resource monitors could be synced, proceeding anyways") } else { logger.Info("Garbage collector: all resource monitors have synced") } @@ -227,14 +227,13 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. } logger.V(4).Info("resynced monitors") - // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. - // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. - // informers keep attempting to sync in the background, so retrying doesn't interrupt them. - // the call to resyncMonitors on the reattempt will no-op for resources that still exist. - // note that workers stay paused until we successfully resync. - if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { + // gc worker no longer waits for cache to be synced, but we will keep the periodical check to provide logs & metrics + cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { return gc.dependencyGraphBuilder.IsSynced(logger) - }) { + }) + if cacheSynced { + logger.V(2).Info("synced garbage collector") + } else { utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) metrics.GarbageCollectorResourcesSyncError.Inc() } @@ -243,7 +242,6 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery. // Monitors where the cache sync times out are still tracked here as // subsequent runs should stop them if their resources were removed. oldResources = newResources - logger.V(2).Info("synced garbage collector") }, period) }