Merge pull request #117992 from liggitt/gc-discovery-flutter

Fix duplicate GC event handlers getting added if discovery flutters
This commit is contained in:
Kubernetes Prow Robot 2023-08-15 15:16:50 -07:00 committed by GitHub
commit 7407f36b4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 205 additions and 48 deletions

View File

@ -187,14 +187,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// Get the current resource list from discovery. // Get the current resource list from discovery.
newResources := GetDeletableResources(logger, discoveryClient) newResources, err := GetDeletableResources(logger, discoveryClient)
// This can occur if there is an internal error in GetDeletableResources.
if len(newResources) == 0 { if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync") logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return return
} }
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
newResources[k] = v
}
}
}
// Decide whether discovery has reported a change. // Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) { if reflect.DeepEqual(oldResources, newResources) {
@ -214,12 +221,21 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
// On a reattempt, check if available resources have changed // On a reattempt, check if available resources have changed
if attempt > 1 { if attempt > 1 {
newResources = GetDeletableResources(logger, discoveryClient) newResources, err = GetDeletableResources(logger, discoveryClient)
if len(newResources) == 0 { if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery", "attempt", attempt) logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
metrics.GarbageCollectorResourcesSyncError.Inc() metrics.GarbageCollectorResourcesSyncError.Inc()
return false, nil return false, nil
} }
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
newResources[k] = v
}
}
}
} }
logger.V(2).Info( logger.V(2).Info(
@ -806,20 +822,23 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
// garbage collector should recognize and work with. More specifically, all // garbage collector should recognize and work with. More specifically, all
// preferred resources which support the 'delete', 'list', and 'watch' verbs. // preferred resources which support the 'delete', 'list', and 'watch' verbs.
// //
// If an error was encountered fetching resources from the server,
// it is included as well, along with any resources that were successfully resolved.
//
// All discovery errors are considered temporary. Upon encountering any error, // All discovery errors are considered temporary. Upon encountering any error,
// GetDeletableResources will log and return any discovered resources it was // GetDeletableResources will log and return any discovered resources it was
// able to process (which may be none). // able to process (which may be none).
func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) (map[schema.GroupVersionResource]struct{}, error) {
preferredResources, err := discoveryClient.ServerPreferredResources() preferredResources, lookupErr := discoveryClient.ServerPreferredResources()
if err != nil { if lookupErr != nil {
if discovery.IsGroupDiscoveryFailedError(err) { if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(lookupErr); isLookupFailure {
logger.Info("failed to discover some groups", "groups", err.(*discovery.ErrGroupDiscoveryFailed).Groups) logger.Info("failed to discover some groups", "groups", groupLookupFailures)
} else { } else {
logger.Info("failed to discover preferred resources", "error", err) logger.Info("failed to discover preferred resources", "error", lookupErr)
} }
} }
if preferredResources == nil { if preferredResources == nil {
return map[schema.GroupVersionResource]struct{}{} return map[schema.GroupVersionResource]struct{}{}, lookupErr
} }
// This is extracted from discovery.GroupVersionResources to allow tolerating // This is extracted from discovery.GroupVersionResources to allow tolerating
@ -837,7 +856,7 @@ func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerR
} }
} }
return deletableGroupVersionResources return deletableGroupVersionResources, lookupErr
} }
func (gc *GarbageCollector) Name() string { func (gc *GarbageCollector) Name() string {

View File

@ -28,6 +28,7 @@ import (
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
@ -805,10 +806,13 @@ func TestGetDeletableResources(t *testing.T) {
PreferredResources: test.serverResources, PreferredResources: test.serverResources,
Error: test.err, Error: test.err,
} }
actual := GetDeletableResources(logger, client) actual, actualErr := GetDeletableResources(logger, client)
if !reflect.DeepEqual(test.deletableResources, actual) { if !reflect.DeepEqual(test.deletableResources, actual) {
t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual) t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual)
} }
if !reflect.DeepEqual(test.err, actualErr) {
t.Errorf("expected error:\n%v\ngot:\n%v", test.err, actualErr)
}
} }
} }
@ -822,7 +826,15 @@ func TestGarbageCollectorSync(t *testing.T) {
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}}, {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
}, },
}, },
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"delete", "list", "watch"}},
},
},
} }
appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}}
unsyncableServerResources := []*metav1.APIResourceList{ unsyncableServerResources := []*metav1.APIResourceList{
{ {
GroupVersion: "v1", GroupVersion: "v1",
@ -845,6 +857,10 @@ func TestGarbageCollectorSync(t *testing.T) {
200, 200,
[]byte("{}"), []byte("{}"),
}, },
"GET" + "/apis/apps/v1/deployments": {
200,
[]byte("{}"),
},
"GET" + "/api/v1/secrets": { "GET" + "/api/v1/secrets": {
404, 404,
[]byte("{}"), []byte("{}"),
@ -859,7 +875,11 @@ func TestGarbageCollectorSync(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
rm := &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)} tweakableRM := meta.NewDefaultRESTMapper(nil)
tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, schema.GroupVersionResource{Version: "v1", Resource: "pods"}, schema.GroupVersionResource{Version: "v1", Resource: "pod"}, meta.RESTScopeNamespace)
tweakableRM.AddSpecific(schema.GroupVersionKind{Version: "v1", Kind: "Secret"}, schema.GroupVersionResource{Version: "v1", Resource: "secrets"}, schema.GroupVersionResource{Version: "v1", Resource: "secret"}, meta.RESTScopeNamespace)
tweakableRM.AddSpecific(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployment"}, meta.RESTScopeNamespace)
rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme)}}
metadataClient, err := metadata.NewForConfig(clientConfig) metadataClient, err := metadata.NewForConfig(clientConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -900,38 +920,70 @@ func TestGarbageCollectorSync(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err) t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
} }
assertMonitors(t, gc, "pods", "deployments")
// Simulate the discovery client returning an error // Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil) fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
// Wait until sync discovers the change // Wait until sync discovers the change
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// No monitor changes
assertMonitors(t, gc, "pods", "deployments")
// Remove the error from being returned and see if the garbage collector sync is still working // Remove the error from being returned and see if the garbage collector sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources) fakeDiscoveryClient.setPreferredResources(serverResources, nil)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil { if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
} }
assertMonitors(t, gc, "pods", "deployments")
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
fakeDiscoveryClient.setError(nil)
// Wait until sync discovers the change // Wait until sync discovers the change
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
assertMonitors(t, gc, "pods", "secrets")
// Put the resources back to normal and ensure garbage collector sync recovers // Put the resources back to normal and ensure garbage collector sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources) fakeDiscoveryClient.setPreferredResources(serverResources, nil)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil { if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err) t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
} }
assertMonitors(t, gc, "pods", "deployments")
// Partial discovery failure
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Deployments monitor kept
assertMonitors(t, gc, "pods", "deployments", "secrets")
// Put the resources back to normal and ensure garbage collector sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
// Unsyncable monitor removed
assertMonitors(t, gc, "pods", "deployments")
}
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
t.Helper()
expected := sets.NewString(resources...)
actual := sets.NewString()
for m := range gc.dependencyGraphBuilder.monitors {
actual.Insert(m.Resource)
}
if !actual.Equal(expected) {
t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List())
}
} }
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
@ -979,15 +1031,10 @@ func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceL
return f.PreferredResources, f.Error return f.PreferredResources, f.Error
} }
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) {
f.Lock.Lock() f.Lock.Lock()
defer f.Lock.Unlock() defer f.Lock.Unlock()
f.PreferredResources = resources f.PreferredResources = resources
}
func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err f.Error = err
} }

View File

@ -265,6 +265,14 @@ func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
logger.V(4).Info("started new monitors", "new", started, "current", len(monitors)) logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
} }
// IsResourceSynced returns true if a monitor exists for the given resource and has synced
func (gb *GraphBuilder) IsResourceSynced(resource schema.GroupVersionResource) bool {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
monitor, ok := gb.monitors[resource]
return ok && monitor.controller.HasSynced()
}
// IsSynced returns true if any monitors exist AND all those monitors' // IsSynced returns true if any monitors exist AND all those monitors'
// controllers HasSynced functions return true. This means IsSynced could return // controllers HasSynced functions return true. This means IsSynced could return
// true at one time, and then later return false if all monitors were // true at one time, and then later return false if all monitors were

View File

@ -440,11 +440,13 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 { if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure && len(newResources) > 0 {
// In partial discovery cases, don't remove any existing informers, just add new ones // In partial discovery cases, preserve existing informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
for k, v := range oldResources { for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed {
newResources[k] = v newResources[k] = v
} }
}
} else { } else {
// short circuit in non-discovery error cases or if discovery returned zero resources // short circuit in non-discovery error cases or if discovery returned zero resources
return return
@ -474,6 +476,10 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return return
} }
// at this point, we've synced the new resources to our monitors, so record that fact.
oldResources = newResources
// wait for caches to fill for a while (our sync period). // wait for caches to fill for a while (our sync period).
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill. // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them. // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
@ -488,8 +494,6 @@ func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResource
return return
} }
// success, remember newly synced resources
oldResources = newResources
logger.V(2).Info("synced quota controller") logger.V(2).Info("synced quota controller")
}, period) }, period)
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
quota "k8s.io/apiserver/pkg/quota/v1" quota "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/apiserver/pkg/quota/v1/generic"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
@ -1016,6 +1017,12 @@ func TestDiscoverySync(t *testing.T) {
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
}, },
}, },
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
} }
unsyncableServerResources := []*metav1.APIResourceList{ unsyncableServerResources := []*metav1.APIResourceList{
{ {
@ -1026,6 +1033,16 @@ func TestDiscoverySync(t *testing.T) {
}, },
}, },
} }
appsV1Resources := []*metav1.APIResourceList{
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
{Name: "deployments", Namespaced: true, Kind: "Deployment", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
}
appsV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "apps", Version: "v1"}: fmt.Errorf(":-/")}}
coreV1Error := &discovery.ErrGroupDiscoveryFailed{Groups: map[schema.GroupVersion]error{{Group: "", Version: "v1"}: fmt.Errorf(":-/")}}
fakeDiscoveryClient := &fakeServerResources{ fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources, PreferredResources: serverResources,
Error: nil, Error: nil,
@ -1043,6 +1060,10 @@ func TestDiscoverySync(t *testing.T) {
404, 404,
[]byte("{}"), []byte("{}"),
}, },
"GET" + "/apis/apps/v1/deployments": {
200,
[]byte("{}"),
},
}, },
} }
@ -1056,9 +1077,11 @@ func TestDiscoverySync(t *testing.T) {
pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
deployments := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
pods: newGenericLister(pods.GroupResource(), []runtime.Object{}), pods: newGenericLister(pods.GroupResource(), []runtime.Object{}),
secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}), secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}),
deployments: newGenericLister(deployments.GroupResource(), []runtime.Object{}),
} }
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources) qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources)
defer close(qc.stop) defer close(qc.stop)
@ -1088,38 +1111,88 @@ func TestDiscoverySync(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err) t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err)
} }
assertMonitors(t, qc, "pods", "deployments")
// Simulate the discovery client returning an error // Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil) fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
fakeDiscoveryClient.setError(fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
// Wait until sync discovers the change // Wait until sync discovers the change
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// No monitors removed
assertMonitors(t, qc, "pods", "deployments")
// Remove the error from being returned and see if the quota sync is still working // Remove the error from being returned and see if the quota sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources) fakeDiscoveryClient.setPreferredResources(serverResources, nil)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil { if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
} }
assertMonitors(t, qc, "pods", "deployments")
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
fakeDiscoveryClient.setError(nil)
// Wait until sync discovers the change // Wait until sync discovers the change
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// deployments removed, secrets added
assertMonitors(t, qc, "pods", "secrets")
// Put the resources back to normal and ensure quota sync recovers // Put the resources back to normal and ensure quota sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources) fakeDiscoveryClient.setPreferredResources(serverResources, nil)
fakeDiscoveryClient.setError(nil)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
// secrets removed, deployments readded
assertMonitors(t, qc, "pods", "deployments")
// apps/v1 discovery failure
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
// deployments remain due to appsv1 error, secrets added
assertMonitors(t, qc, "pods", "deployments", "secrets")
// core/v1 discovery failure
fakeDiscoveryClient.setPreferredResources(appsV1Resources, coreV1Error)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
// pods and secrets remain due to corev1 error
assertMonitors(t, qc, "pods", "deployments", "secrets")
// Put the resources back to normal and ensure quota sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil { if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
} }
// secrets removed, deployments remain
assertMonitors(t, qc, "pods", "deployments")
}
func assertMonitors(t *testing.T, qc quotaController, resources ...string) {
t.Helper()
expected := sets.NewString(resources...)
actual := sets.NewString()
for m := range qc.Controller.quotaMonitor.monitors {
actual.Insert(m.Resource)
}
if !actual.Equal(expected) {
t.Fatalf("expected monitors %v, got %v", expected.List(), actual.List())
}
} }
// testServerAndClientConfig returns a server that listens and a config that can reference it // testServerAndClientConfig returns a server that listens and a config that can reference it
@ -1169,15 +1242,10 @@ func (*fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceLis
return nil, nil return nil, nil
} }
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList, err error) {
f.Lock.Lock() f.Lock.Lock()
defer f.Lock.Unlock() defer f.Lock.Unlock()
f.PreferredResources = resources f.PreferredResources = resources
}
func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err f.Error = err
} }

View File

@ -19,6 +19,7 @@ package discovery
import ( import (
"context" "context"
"encoding/json" "encoding/json"
goerrors "errors"
"fmt" "fmt"
"mime" "mime"
"net/http" "net/http"
@ -422,6 +423,16 @@ func IsGroupDiscoveryFailedError(err error) bool {
return err != nil && ok return err != nil && ok
} }
// GroupDiscoveryFailedErrorGroups returns true if the error is an ErrGroupDiscoveryFailed error,
// along with the map of group versions that failed discovery.
func GroupDiscoveryFailedErrorGroups(err error) (map[schema.GroupVersion]error, bool) {
var groupDiscoveryError *ErrGroupDiscoveryFailed
if err != nil && goerrors.As(err, &groupDiscoveryError) {
return groupDiscoveryError.Groups, true
}
return nil, false
}
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList var resources []*metav1.APIResourceList