diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index c675b27c37d..174bd838071 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -24,6 +24,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -49,6 +50,7 @@ import ( "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -60,9 +62,11 @@ import ( clientgotesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + metricsutil "k8s.io/component-base/metrics/testutil" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/kubernetes/pkg/api/legacyscheme" c "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/garbagecollector/metrics" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -846,7 +850,6 @@ func TestGarbageCollectorSync(t *testing.T) { PreferredResources: serverResources, Error: nil, Lock: sync.Mutex{}, - InterfaceUsedCount: 0, } testHandler := &fakeActionHandler{ @@ -865,7 +868,24 @@ func TestGarbageCollectorSync(t *testing.T) { }, }, } - srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + + testHandler2 := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/secrets": { + 200, + []byte("{}"), + }, + }, + } + var secretSyncOK atomic.Bool + var alternativeTestHandler = func(response http.ResponseWriter, request *http.Request) { + if request.URL.Path == "/api/v1/secrets" && secretSyncOK.Load() { + testHandler2.ServeHTTP(response, request) + return + } + testHandler.ServeHTTP(response, request) + } + srv, clientConfig := testServerAndClientConfig(alternativeTestHandler) defer srv.Close() clientConfig.ContentConfig.NegotiatedSerializer = nil client, err := kubernetes.NewForConfig(clientConfig) @@ -885,7 +905,7 @@ func TestGarbageCollectorSync(t *testing.T) { sharedInformers := informers.NewSharedInformerFactory(client, 0) - tCtx := ktesting.Init(t) + logger, tCtx := ktesting.NewTestContext(t) defer tCtx.Cancel("test has completed") alwaysStarted := make(chan struct{}) close(alwaysStarted) @@ -913,30 +933,49 @@ func TestGarbageCollectorSync(t *testing.T) { // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning an error fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) + + // Wait until sync discovers the change time.Sleep(1 * time.Second) + // No monitor changes assertMonitors(t, gc, "pods", "deployments") // Remove the error from being returned and see if the garbage collector sync is still working fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) + + // Wait until sync discovers the change time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") // Put the resources back to normal and ensure garbage collector sync recovers fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } assertMonitors(t, gc, "pods", "deployments") // Partial discovery failure fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) + // Wait until sync discovers the change time.Sleep(1 * time.Second) // Deployments monitor kept assertMonitors(t, gc, "pods", "deployments", "secrets") @@ -945,35 +984,33 @@ func TestGarbageCollectorSync(t *testing.T) { fakeDiscoveryClient.setPreferredResources(serverResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient) + if err != nil { + t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) + } // Unsyncable monitor removed assertMonitors(t, gc, "pods", "deployments") - // Add fake controller simulate the initial not-synced informer which will be synced at the end. - fc := fakeController{} - gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{ - Version: "v1", - Resource: "secrets", - }] = &monitor{controller: &fc} - if gc.IsSynced(logger) { - t.Fatal("cache from garbage collector should not be synced") - } - + // Simulate initial not-synced informer which will be synced at the end. + metrics.GarbageCollectorResourcesSyncError.Reset() fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) time.Sleep(1 * time.Second) assertMonitors(t, gc, "pods", "secrets") - - // The informer is synced now. - fc.SetSynced(true) - time.Sleep(1 * time.Second) - assertMonitors(t, gc, "pods", "secrets") - - if !gc.IsSynced(logger) { - t.Fatal("cache from garbage collector should be synced") + if gc.IsSynced(logger) { + t.Fatal("cache from garbage collector should not be synced") + } + val, _ := metricsutil.GetCounterMetricValue(metrics.GarbageCollectorResourcesSyncError) + if val < 1 { + t.Fatalf("expect sync error metric > 0") } - fakeDiscoveryClient.setPreferredResources(serverResources, nil) - time.Sleep(1 * time.Second) - assertMonitors(t, gc, "pods", "deployments") + // The informer is synced now. + secretSyncOK.Store(true) + if err := wait.PollUntilContextTimeout(tCtx, time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + return gc.IsSynced(logger), nil + }); err != nil { + t.Fatal(err) + } } func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { @@ -988,6 +1025,17 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) { } } +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error { + before := fakeDiscoveryClient.getInterfaceUsedCount() + t := 1 * time.Second + time.Sleep(t) + after := fakeDiscoveryClient.getInterfaceUsedCount() + if before == after { + return fmt.Errorf("discoveryClient.ServerPreferredResources() not called over %v", t) + } + return nil +} + type fakeServerResources struct { PreferredResources []*metav1.APIResourceList Error error @@ -1017,6 +1065,12 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou f.Error = err } +func (f *fakeServerResources) getInterfaceUsedCount() int { + f.Lock.Lock() + defer f.Lock.Unlock() + return f.InterfaceUsedCount +} + func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { return nil, nil } @@ -2754,28 +2808,6 @@ func assertState(s state) step { } -type fakeController struct { - synced bool - lock sync.Mutex -} - -func (f *fakeController) Run(stopCh <-chan struct{}) { -} - -func (f *fakeController) HasSynced() bool { - return f.synced -} - -func (f *fakeController) SetSynced(synced bool) { - f.lock.Lock() - defer f.lock.Unlock() - f.synced = synced -} - -func (f *fakeController) LastSyncResourceVersion() string { - return "" -} - // trackingWorkqueue implements RateLimitingInterface, // allows introspection of the items in the queue, // and treats AddAfter and AddRateLimited the same as Add diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 0c00b2dad39..9e62cc50133 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -81,6 +81,29 @@ const oneValidOwnerPodName = "test.pod.3" const toBeDeletedRCName = "test.rc.1" const remainingRCName = "test.rc.2" +// testCert was generated from crypto/tls/generate_cert.go with the following command: +// +// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var testCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDGDCCAgCgAwIBAgIQTKCKn99d5HhQVCLln2Q+eTANBgkqhkiG9w0BAQsFADAS +MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw +MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA1Z5/aTwqY706M34tn60l8ZHkanWDl8mM1pYf4Q7qg3zA9XqWLX6S +4rTYDYCb4stEasC72lQnbEWHbthiQE76zubP8WOFHdvGR3mjAvHWz4FxvLOTheZ+ +3iDUrl6Aj9UIsYqzmpBJAoY4+vGGf+xHvuukHrVcFqR9ZuBdZuJ/HbbjUyuNr3X9 +erNIr5Ha17gVzf17SNbYgNrX9gbCeEB8Z9Ox7dVuJhLDkpF0T/B5Zld3BjyUVY/T +cukU4dTVp6isbWPvCMRCZCCOpb+qIhxEjJ0n6tnPt8nf9lvDl4SWMl6X1bH+2EFa +a8R06G0QI+XhwPyjXUyCR8QEOZPCR5wyqQIDAQABo2gwZjAOBgNVHQ8BAf8EBAMC +AqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEB/zAuBgNVHREE +JzAlggtleGFtcGxlLmNvbYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG +9w0BAQsFAAOCAQEAThqgJ/AFqaANsOp48lojDZfZBFxJQ3A4zfR/MgggUoQ9cP3V +rxuKAFWQjze1EZc7J9iO1WvH98lOGVNRY/t2VIrVoSsBiALP86Eew9WucP60tbv2 +8/zsBDSfEo9Wl+Q/gwdEh8dgciUKROvCm76EgAwPGicMAgRsxXgwXHhS5e8nnbIE +Ewaqvb5dY++6kh0Oz+adtNT5OqOwXTIRI67WuEe6/B3Z4LNVPQDIj7ZUJGNw8e6L +F4nkUthwlKx4yEJHZBRuFPnO7Z81jNKuwL276+mczRH7piI6z9uyMV/JbEsOIxyL +W6CzB7pZ9Nj1YLpgzc1r6oONHLokMJJIz/IvkQ== +-----END CERTIFICATE-----`) + func newPod(podName, podNamespace string, ownerReferences []metav1.OwnerReference) *v1.Pod { for i := 0; i < len(ownerReferences); i++ { if len(ownerReferences[i].Kind) == 0 { @@ -252,6 +275,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work logger := tCtx.Logger() alwaysStarted := make(chan struct{}) close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( tCtx, clientSet, @@ -1285,3 +1309,119 @@ func testCRDDeletion(t *testing.T, ctx *testContext, ns *v1.Namespace, definitio t.Fatalf("failed waiting for dependent %q (owned by %q) to be deleted", dependent.GetName(), owner.GetName()) } } + +// TestCascadingDeleteOnCRDConversionFailure tests that a bad conversion webhook cannot block the entire GC controller. +// Historically, a cache sync failure from a single resource prevented GC controller from running. This test creates +// a CRD, updates the storage version with a bad conversion webhook and then runs a simple cascading delete test. +func TestCascadingDeleteOnCRDConversionFailure(t *testing.T) { + ctx := setup(t, 0) + defer ctx.tearDown() + gc, apiExtensionClient, dynamicClient, clientSet := ctx.gc, ctx.apiExtensionClient, ctx.dynamicClient, ctx.clientSet + + ns := createNamespaceOrDie("gc-cache-sync-fail", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) + + // Create a CRD with storage/serving version v1beta2. Then update the CRD with v1 as the storage version + // and an invalid conversion webhook. This should result in cache sync failures for the CRD from the GC controller. + def, dc := createRandomCustomResourceDefinition(t, apiExtensionClient, dynamicClient, ns.Name) + _, err := dc.Create(context.TODO(), newCRDInstance(def, ns.Name, names.SimpleNameGenerator.GenerateName("test")), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create custom resource: %v", err) + } + + def, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), def.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get custom resource: %v", err) + } + + newDefinition := def.DeepCopy() + newDefinition.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{ + Strategy: apiextensionsv1.WebhookConverter, + Webhook: &apiextensionsv1.WebhookConversion{ + ClientConfig: &apiextensionsv1.WebhookClientConfig{ + Service: &apiextensionsv1.ServiceReference{ + Name: "foobar", + Namespace: ns.Name, + }, + CABundle: testCert, + }, + ConversionReviewVersions: []string{ + "v1", "v1beta1", + }, + }, + } + newDefinition.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: apiextensionstestserver.AllowAllSchema(), + }, + { + Name: "v1beta1", + Served: true, + Storage: false, + Schema: apiextensionstestserver.AllowAllSchema(), + }, + } + + _, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), newDefinition, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error updating CRD with conversion webhook: %v", err) + } + + ctx.startGC(5) + + rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name) + podClient := clientSet.CoreV1().Pods(ns.Name) + + toBeDeletedRC, err := rcClient.Create(context.TODO(), newOwnerRC(toBeDeletedRCName, ns.Name), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + + rcs, err := rcClient.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list replication controllers: %v", err) + } + if len(rcs.Items) != 1 { + t.Fatalf("Expect only 1 replication controller") + } + + pod := newPod(garbageCollectedPodName, ns.Name, []metav1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}}) + _, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + pods, err := podClient.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + if len(pods.Items) != 1 { + t.Fatalf("Expect only 1 pods") + } + + if err := rcClient.Delete(context.TODO(), toBeDeletedRCName, getNonOrphanOptions()); err != nil { + t.Fatalf("failed to delete replication controller: %v", err) + } + + // sometimes the deletion of the RC takes long time to be observed by + // the gc, so wait for the garbage collector to observe the deletion of + // the toBeDeletedRC + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + return !gc.GraphHasUID(toBeDeletedRC.ObjectMeta.UID), nil + }); err != nil { + t.Fatal(err) + } + if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 1*time.Second, 30*time.Second); err != nil { + t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err) + } + + // Check that the cache is still not synced after cascading delete succeeded + // If this check passes, check that the conversion webhook is correctly misconfigured + // to prevent watch cache from listing the CRD. + if ctx.gc.IsSynced(ctx.logger) { + t.Fatal("cache is not expected to be synced due to bad conversion webhook") + } +}