diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 7faddb4a483..8b6292772c9 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -440,10 +440,12 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource if err != nil { utilruntime.HandleError(err) - if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 { - // In partial discovery cases, don't remove any existing informers, just add new ones + if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure && len(newResources) > 0 { + // In partial discovery cases, preserve existing informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources for k, v := range oldResources { - newResources[k] = v + if _, failed := groupLookupFailures[k.GroupVersion()]; failed { + newResources[k] = v + } } } else { // short circuit in non-discovery error cases or if discovery returned zero resources @@ -474,6 +476,10 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } + + // at this point, we've synced the new resources to our monitors, so record that fact. + oldResources = newResources + // wait for caches to fill for a while (our sync period). // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. // informers keep attempting to sync in the background, so retrying doesn't interrupt them. @@ -488,8 +494,6 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource return } - // success, remember newly synced resources - oldResources = newResources logger.V(2).Info("synced quota controller") }, period) } diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 8a99dcacdc4..af02d894740 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" quota "k8s.io/apiserver/pkg/quota/v1" "k8s.io/apiserver/pkg/quota/v1/generic" + "k8s.io/client-go/discovery" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -1016,6 +1017,12 @@ func TestDiscoverySync(t *testing.T) { {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, }, }, + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, } unsyncableServerResources := []*metav1.APIResourceList{ { @@ -1026,6 +1033,16 @@ func TestDiscoverySync(t *testing.T) { }, }, } + appsV1Resources := []*metav1.APIResourceList{ + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, + } + appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}} + coreV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "", Version: "v1"}: fmt.Errorf(":-/")}} fakeDiscoveryClient := &fakeServerResources{ PreferredResources: serverResources, Error: nil, @@ -1043,6 +1060,10 @@ func TestDiscoverySync(t *testing.T) { 404, []byte("{}"), }, + "GET" + "/apis/apps/v1/deployments": { + 200, + []byte("{}"), + }, }, } @@ -1056,9 +1077,11 @@ func TestDiscoverySync(t *testing.T) { pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} + deployments := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ - pods: newGenericLister(pods.GroupResource(), []runtime.Object{}), - secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}), + pods: newGenericLister(pods.GroupResource(), []runtime.Object{}), + secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}), + deployments: newGenericLister(deployments.GroupResource(), []runtime.Object{}), } qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources) defer close(qc.stop) @@ -1088,38 +1111,88 @@ func TestDiscoverySync(t *testing.T) { if err != nil { t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err) } + assertMonitors(t, qc, "pods", "deployments") // Simulate the discovery client returning an error - fakeDiscoveryClient.setPreferredResources(nil) - fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) + fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()")) // Wait until sync discovers the change time.Sleep(1 * time.Second) + // No monitors removed + assertMonitors(t, qc, "pods", "deployments") // Remove the error from being returned and see if the quota sync is still working - fakeDiscoveryClient.setPreferredResources(serverResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(serverResources, nil) err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) if err != nil { t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) } + assertMonitors(t, qc, "pods", "deployments") // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches - fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil) // Wait until sync discovers the change time.Sleep(1 * time.Second) + // deployments removed, secrets added + assertMonitors(t, qc, "pods", "secrets") // Put the resources back to normal and ensure quota sync recovers - fakeDiscoveryClient.setPreferredResources(serverResources) - fakeDiscoveryClient.setError(nil) + fakeDiscoveryClient.setPreferredResources(serverResources, nil) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } + // secrets removed, deployments readded + assertMonitors(t, qc, "pods", "deployments") + + // apps/v1 discovery failure + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error) + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } + // deployments remain due to appsv1 error, secrets added + assertMonitors(t, qc, "pods", "deployments", "secrets") + + // core/v1 discovery failure + fakeDiscoveryClient.setPreferredResources(appsV1Resources, coreV1Error) + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } + // pods and secrets remain due to corev1 error + assertMonitors(t, qc, "pods", "deployments", "secrets") + + // Put the resources back to normal and ensure quota sync recovers + fakeDiscoveryClient.setPreferredResources(serverResources, nil) err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) if err != nil { t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) } + // secrets removed, deployments remain + assertMonitors(t, qc, "pods", "deployments") +} + +func assertMonitors(t *testing.T, qc quotaController, resources ...string) { + t.Helper() + expected := sets.NewString(resources...) + actual := sets.NewString() + for m := range qc.Controller.quotaMonitor.monitors { + actual.Insert(m.Resource) + } + if !actual.Equal(expected) { + t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List()) + } } // testServerAndClientConfig returns a server that listens and a config that can reference it @@ -1169,15 +1242,10 @@ func (*fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceLis return nil, nil } -func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { +func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) { f.Lock.Lock() defer f.Lock.Unlock() f.PreferredResources = resources -} - -func (f *fakeServerResources) setError(err error) { - f.Lock.Lock() - defer f.Lock.Unlock() f.Error = err }